319 lines
9.0 KiB
PHP
319 lines
9.0 KiB
PHP
<?php
|
|
|
|
define('TRANSACTION_REMOVED', 'removed');
|
|
|
|
$job_name = pathinfo(__FILE__, PATHINFO_FILENAME);
|
|
echo "[" . date("Y-m-d H:i:s") . "] " . $job_name . " job is starting.\n";
|
|
|
|
include '../backend.php';
|
|
require_once('./lock.php');
|
|
require_once('./common/Logger.php');
|
|
$db_host = $savvyext->cfgReadChar('database.host');
|
|
$db_name = $savvyext->cfgReadChar('database.name');
|
|
$db_user = $savvyext->cfgReadChar('database.user');
|
|
$db_pass = $savvyext->cfgReadChar('database.pass');
|
|
$db_port = $savvyext->cfgReadLong('database.port');
|
|
|
|
$connstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
|
|
$con = pg_connect($connstr) or die("Could not connect to server\n");
|
|
|
|
$db_host = $savvyext->cfgReadChar('database_replica.host');
|
|
$db_name = $savvyext->cfgReadChar('database_replica.name');
|
|
$db_user = $savvyext->cfgReadChar('database_replica.user');
|
|
$db_pass = $savvyext->cfgReadChar('database_replica.pass');
|
|
$db_port = $savvyext->cfgReadLong('database_replica.port');
|
|
$readOnlyReplicaConnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
|
|
$readOnlyReplicaConn = pg_connect($readOnlyReplicaConnstr);
|
|
|
|
if($readOnlyReplicaConn === FALSE){
|
|
$readOnlyReplicaConn = $con;
|
|
}
|
|
|
|
if ($con === FALSE || $readOnlyReplicaConn === FALSE) {
|
|
unlock_pid_file($lock_file);
|
|
die("Could not connect to server\n");
|
|
}
|
|
|
|
function execute($inX = [])
|
|
{
|
|
$outX = [];
|
|
|
|
// get transaction list
|
|
fetch_bank_transactions($inX, $outX);
|
|
|
|
if ($outX['count']) {
|
|
|
|
$response = $outX['results'];
|
|
|
|
remove_transaction($response);
|
|
|
|
// the rest of tranactions
|
|
$import_transactions = array_filter($response, function ($ele) {
|
|
return $ele['tstatus'] !== TRANSACTION_REMOVED;
|
|
});
|
|
|
|
update_transaction($import_transactions, $imported_ids);
|
|
insert_transaction($import_transactions, $imported_ids);
|
|
}
|
|
|
|
if (!empty($outX['next'])) {
|
|
|
|
parse_str(parse_url($outX['next'], PHP_URL_QUERY), $inX);
|
|
|
|
execute($inX);
|
|
} else {
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
function generate_params($data) {
|
|
$value = [];
|
|
foreach ($data as $element) {
|
|
|
|
$value[] = [
|
|
'import_id' => $element['id'],
|
|
'account' => $element['account'],
|
|
'member_id' => $element['member_id'],
|
|
'amount' => $element['amount'],
|
|
'currency' => $element['currency'],
|
|
'status' => $element['status'],
|
|
'tstatus' => $element['tstatus'],
|
|
'description' => $element['description'],
|
|
'time' => $element['time'],
|
|
'category' => $element['category'],
|
|
'provider_category' => $element['provider_category'],
|
|
'merchant_provider_id' => $element['merchant_provider_id'],
|
|
'merchant_name' => $element['merchant_name'],
|
|
'extra' => json_encode($element['extra'])
|
|
];
|
|
}
|
|
|
|
return $value;
|
|
}
|
|
|
|
function fetch_bank_transactions($in, &$out) {
|
|
global $savvyext;
|
|
$target_url = str_replace("'", "", $savvyext->cfgReadChar('microservices.account'))
|
|
. "/api/v1/transactions/get_raw_transactions";
|
|
|
|
$target_url = $target_url . "/?" . http_build_query($in);
|
|
|
|
//open connection
|
|
$ch = curl_init();
|
|
//set the url, number of POST vars, POST data
|
|
curl_setopt($ch, CURLOPT_URL, $target_url);
|
|
|
|
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
|
|
'Content-Type: application/json',
|
|
'Connection: Keep-Alive',
|
|
'Authorization: Server-Token ' . $savvyext->cfgReadChar('system.bank_token'),
|
|
'Accept: application/json'
|
|
));
|
|
|
|
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
|
|
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
|
|
curl_setopt($ch, CURLOPT_FAILONERROR, true);
|
|
|
|
//execute post
|
|
$result = curl_exec($ch);
|
|
if (curl_errno($ch)) {
|
|
$error_msg = curl_error($ch);
|
|
}
|
|
//close connection
|
|
curl_close($ch);
|
|
if (isset($error_msg)) {
|
|
Logger::debug('transactions_record_import fetch_bank_transactions error:');
|
|
Logger::debug($error_msg);
|
|
}
|
|
|
|
$out = json_decode($result, true);
|
|
}
|
|
|
|
function get_import_id_list_in_table($import_ids) {
|
|
global $readOnlyReplicaConn;
|
|
$query =
|
|
'SELECT import_id
|
|
FROM members_transactions_import_raw
|
|
WHERE import_id IN (' . implode(',', $import_ids) . ')';
|
|
$result = pg_query($readOnlyReplicaConn, $query);
|
|
|
|
$imported_ids = [];
|
|
if ($result && pg_num_rows($result)) {
|
|
while ($row = pg_fetch_row($result)) {
|
|
$imported_ids[] = $row[0];
|
|
}
|
|
}
|
|
|
|
return $imported_ids;
|
|
}
|
|
|
|
function delete_tranasction($remove_ids) {
|
|
global $con;
|
|
$query = 'DELETE FROM members_transactions_import_raw
|
|
WHERE import_id IN (' . implode(',', $remove_ids) . ')';
|
|
|
|
if (!pg_query($con, $query)) {
|
|
echo pg_last_error();
|
|
Logger::debug('transactions_record_import delete_tranasction error:');
|
|
Logger::debug(pg_last_error());
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
function mark_transaction_as_imported($id) {
|
|
global $savvyext;
|
|
$target_url = str_replace("'", "", $savvyext->cfgReadChar('microservices.account'))
|
|
. "/api/v1/transactions/{$id}/";
|
|
|
|
//open connection
|
|
$ch = curl_init();
|
|
//set the url, number of POST vars, POST data
|
|
curl_setopt($ch, CURLOPT_URL, $target_url);
|
|
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'PATCH');
|
|
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode(
|
|
['points_imported' => date('Y-m-d\TH:i:s.Z\Z', time())]
|
|
));
|
|
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
|
|
'Content-Type: application/json',
|
|
'Connection: Keep-Alive',
|
|
'Authorization: Server-Token ' . $savvyext->cfgReadChar('system.bank_token'),
|
|
'Accept: application/json'
|
|
));
|
|
|
|
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
|
|
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
|
|
curl_setopt($ch, CURLOPT_FAILONERROR, true);
|
|
|
|
//execute post
|
|
$result = curl_exec($ch);
|
|
if (curl_errno($ch)) {
|
|
$error_msg = curl_error($ch);
|
|
}
|
|
//close connection
|
|
curl_close($ch);
|
|
if (isset($error_msg)) {
|
|
Logger::debug('transactions_record_import mark_transaction_as_imported error:');
|
|
Logger::debug($error_msg);
|
|
}
|
|
$out = json_decode($result, true);
|
|
if (isset($out['error'])) echo implode("\n",$out['non_field_errors']);
|
|
}
|
|
|
|
function remove_transaction($data) {
|
|
// transactions removed
|
|
$remove_ids = array_column(array_filter($data, function ($ele) {
|
|
return $ele['tstatus'] === TRANSACTION_REMOVED;
|
|
}), 'id');
|
|
|
|
if ($remove_ids && delete_tranasction($remove_ids)) {
|
|
|
|
foreach ($remove_ids as $id) {
|
|
mark_transaction_as_imported($id);
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
function update_transaction($data, &$imported_ids) {
|
|
global $con;
|
|
// get transactions imported
|
|
$imported_ids = get_import_id_list_in_table(array_column($data, 'id'));
|
|
|
|
if ($imported_ids) {
|
|
$imported_transactions = array_filter($data, function ($ele) use ($imported_ids) {
|
|
return in_array($ele['id'], $imported_ids);
|
|
});
|
|
|
|
$params_update = generate_params($imported_transactions);
|
|
|
|
foreach ($params_update as $val) {
|
|
|
|
$rs = pg_query_params($con,
|
|
"UPDATE members_transactions_import_raw
|
|
SET
|
|
account = $2,
|
|
member_id = $3,
|
|
amount = $4,
|
|
currency = $5,
|
|
status = $6,
|
|
tstatus = $7,
|
|
description = $8,
|
|
time = $9,
|
|
category = $10,
|
|
provider_category = $11,
|
|
merchant_provider_id = $12,
|
|
merchant_name = $13,
|
|
extra = $14
|
|
WHERE import_id = $1
|
|
",
|
|
$val
|
|
);
|
|
|
|
if (!$rs) {
|
|
|
|
echo pg_last_error();
|
|
Logger::debug('transactions_record_import update_transaction error:');
|
|
Logger::debug(pg_last_error());
|
|
} else {
|
|
|
|
mark_transaction_as_imported($val['import_id']);
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
function insert_transaction($data, $imported_ids) {
|
|
global $con;
|
|
// get the transactions not importedimported_ids
|
|
$import_transactions = array_filter($data, function ($ele) use ($imported_ids) {
|
|
return !in_array($ele['id'], $imported_ids);
|
|
});
|
|
|
|
$params_insert = generate_params($import_transactions);
|
|
|
|
foreach ($params_insert as $val) {
|
|
$rs = pg_query_params($con,
|
|
"INSERT INTO members_transactions_import_raw
|
|
(
|
|
import_id,
|
|
account,
|
|
member_id,
|
|
amount,
|
|
currency,
|
|
status,
|
|
tstatus,
|
|
description,
|
|
time,
|
|
category,
|
|
provider_category,
|
|
merchant_provider_id,
|
|
merchant_name,
|
|
extra
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
|
|
ON CONFLICT(import_id) DO NOTHING",
|
|
$val
|
|
);
|
|
|
|
if (!$rs) {
|
|
|
|
echo "Cannot execute query with import id:" . $val['id']; // import_id
|
|
Logger::debug('transactions_record_import insert_transaction error:');
|
|
Logger::debug(pg_last_error());
|
|
} else {
|
|
|
|
mark_transaction_as_imported($val['import_id']);
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
$lock_file = lock_pid_file();
|
|
execute();
|
|
unlock_pid_file($lock_file);
|
|
|
|
echo "[" . date("Y-m-d H:i:s") . "] " . $job_name . " job complete.\n";
|