cfgReadChar('database.host'); $db_name = $savvyext->cfgReadChar('database.name'); $db_user = $savvyext->cfgReadChar('database.user'); $db_pass = $savvyext->cfgReadChar('database.pass'); $db_port = $savvyext->cfgReadLong('database.port'); $connstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}"; $con = pg_connect($connstr) or die("Could not connect to server\n"); $db_host = $savvyext->cfgReadChar('database_replica.host'); $db_name = $savvyext->cfgReadChar('database_replica.name'); $db_user = $savvyext->cfgReadChar('database_replica.user'); $db_pass = $savvyext->cfgReadChar('database_replica.pass'); $db_port = $savvyext->cfgReadLong('database_replica.port'); $readOnlyReplicaConnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}"; $readOnlyReplicaConn = pg_connect($readOnlyReplicaConnstr); if($readOnlyReplicaConn === FALSE){ $readOnlyReplicaConn = $con; } if ($con === FALSE || $readOnlyReplicaConn === FALSE) { unlock_pid_file($lock_file); die("Could not connect to server\n"); } function execute($inX = []) { $outX = []; // get transaction list fetch_bank_transactions($inX, $outX); if ($outX['count']) { $response = $outX['results']; remove_transaction($response); // the rest of tranactions $import_transactions = array_filter($response, function ($ele) { return $ele['tstatus'] !== TRANSACTION_REMOVED; }); update_transaction($import_transactions, $imported_ids); insert_transaction($import_transactions, $imported_ids); } if (!empty($outX['next'])) { parse_str(parse_url($outX['next'], PHP_URL_QUERY), $inX); execute($inX); } else { return; } } function generate_params($data) { $value = []; foreach ($data as $element) { $value[] = [ 'import_id' => $element['id'], 'account' => $element['account'], 'member_id' => $element['member_id'], 'amount' => $element['amount'], 'currency' => $element['currency'], 'status' => $element['status'], 'tstatus' => $element['tstatus'], 'description' => $element['description'], 'time' => $element['time'], 'category' => $element['category'], 'provider_category' => $element['provider_category'], 'merchant_provider_id' => $element['merchant_provider_id'], 'merchant_name' => $element['merchant_name'], 'extra' => json_encode($element['extra']) ]; } return $value; } function fetch_bank_transactions($in, &$out) { global $savvyext; $target_url = str_replace("'", "", $savvyext->cfgReadChar('microservices.account')) . "/api/v1/transactions/get_raw_transactions"; $target_url = $target_url . "/?" . http_build_query($in); //open connection $ch = curl_init(); //set the url, number of POST vars, POST data curl_setopt($ch, CURLOPT_URL, $target_url); curl_setopt($ch, CURLOPT_HTTPHEADER, array( 'Content-Type: application/json', 'Connection: Keep-Alive', 'Authorization: Server-Token ' . $savvyext->cfgReadChar('system.bank_token'), 'Accept: application/json' )); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($ch, CURLOPT_FAILONERROR, true); //execute post $result = curl_exec($ch); if (curl_errno($ch)) { $error_msg = curl_error($ch); } //close connection curl_close($ch); if (isset($error_msg)) { Logger::debug('transactions_record_import fetch_bank_transactions error:'); Logger::debug($error_msg); } $out = json_decode($result, true); } function get_import_id_list_in_table($import_ids) { global $readOnlyReplicaConn; $query = 'SELECT import_id FROM members_transactions_import_raw WHERE import_id IN (' . implode(',', $import_ids) . ')'; $result = pg_query($readOnlyReplicaConn, $query); $imported_ids = []; if ($result && pg_num_rows($result)) { while ($row = pg_fetch_row($result)) { $imported_ids[] = $row[0]; } } return $imported_ids; } function delete_tranasction($remove_ids) { global $con; $query = 'DELETE FROM members_transactions_import_raw WHERE import_id IN (' . implode(',', $remove_ids) . ')'; if (!pg_query($con, $query)) { echo pg_last_error(); Logger::debug('transactions_record_import delete_tranasction error:'); Logger::debug(pg_last_error()); return false; } return true; } function mark_transaction_as_imported($id) { global $savvyext; $target_url = str_replace("'", "", $savvyext->cfgReadChar('microservices.account')) . "/api/v1/transactions/{$id}/"; //open connection $ch = curl_init(); //set the url, number of POST vars, POST data curl_setopt($ch, CURLOPT_URL, $target_url); curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'PATCH'); curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode( ['points_imported' => date('Y-m-d\TH:i:s.Z\Z', time())] )); curl_setopt($ch, CURLOPT_HTTPHEADER, array( 'Content-Type: application/json', 'Connection: Keep-Alive', 'Authorization: Server-Token ' . $savvyext->cfgReadChar('system.bank_token'), 'Accept: application/json' )); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($ch, CURLOPT_FAILONERROR, true); //execute post $result = curl_exec($ch); if (curl_errno($ch)) { $error_msg = curl_error($ch); } //close connection curl_close($ch); if (isset($error_msg)) { Logger::debug('transactions_record_import mark_transaction_as_imported error:'); Logger::debug($error_msg); } $out = json_decode($result, true); if (isset($out['error'])) echo implode("\n",$out['non_field_errors']); } function remove_transaction($data) { // transactions removed $remove_ids = array_column(array_filter($data, function ($ele) { return $ele['tstatus'] === TRANSACTION_REMOVED; }), 'id'); if ($remove_ids && delete_tranasction($remove_ids)) { foreach ($remove_ids as $id) { mark_transaction_as_imported($id); } } } function update_transaction($data, &$imported_ids) { global $con; // get transactions imported $imported_ids = get_import_id_list_in_table(array_column($data, 'id')); if ($imported_ids) { $imported_transactions = array_filter($data, function ($ele) use ($imported_ids) { return in_array($ele['id'], $imported_ids); }); $params_update = generate_params($imported_transactions); foreach ($params_update as $val) { $rs = pg_query_params($con, "UPDATE members_transactions_import_raw SET account = $2, member_id = $3, amount = $4, currency = $5, status = $6, tstatus = $7, description = $8, time = $9, category = $10, provider_category = $11, merchant_provider_id = $12, merchant_name = $13, extra = $14 WHERE import_id = $1 ", $val ); if (!$rs) { echo pg_last_error(); Logger::debug('transactions_record_import update_transaction error:'); Logger::debug(pg_last_error()); } else { mark_transaction_as_imported($val['import_id']); } } } } function insert_transaction($data, $imported_ids) { global $con; // get the transactions not importedimported_ids $import_transactions = array_filter($data, function ($ele) use ($imported_ids) { return !in_array($ele['id'], $imported_ids); }); $params_insert = generate_params($import_transactions); foreach ($params_insert as $val) { $rs = pg_query_params($con, "INSERT INTO members_transactions_import_raw ( import_id, account, member_id, amount, currency, status, tstatus, description, time, category, provider_category, merchant_provider_id, merchant_name, extra ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) ON CONFLICT(import_id) DO NOTHING", $val ); if (!$rs) { echo "Cannot execute query with import id:" . $val['id']; // import_id Logger::debug('transactions_record_import insert_transaction error:'); Logger::debug(pg_last_error()); } else { mark_transaction_as_imported($val['import_id']); } } } $lock_file = lock_pid_file(); execute(); unlock_pid_file($lock_file); echo "[" . date("Y-m-d H:i:s") . "] " . $job_name . " job complete.\n";