Files
FloatBackOfffice/CRONS/member_tracking_duration.php
dev-chiefworks f76abffdcd first commit
2022-05-31 16:21:53 -04:00

141 lines
6.0 KiB
PHP

<?php
echo "[".date("Y-m-d H:i:s")."] GPS member_tracking_duration job is starting.\n";
require('../backend.php');
// We will not limit the SQL we will exhaust the allowed memory
// we will process data in ${hard_limit} batches and hopefully catch up
$hard_limit = 500000;
// GPS coordinate precision to assume the same location
$precision = 3;
$db_host = $savvyext->cfgReadChar('gpsdatabase.host');
$db_name = $savvyext->cfgReadChar('gpsdatabase.name');
$db_user = $savvyext->cfgReadChar('gpsdatabase.user');
$db_pass = $savvyext->cfgReadChar('gpsdatabase.pass');
$db_port = $savvyext->cfgReadLong('gpsdatabase.port');
$connstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
$conn = pg_connect($connstr);
$db_host = $savvyext->cfgReadChar('database_replica.host');
$db_name = $savvyext->cfgReadChar('database_replica.name');
$db_user = $savvyext->cfgReadChar('database_replica.user');
$db_pass = $savvyext->cfgReadChar('database_replica.pass');
$db_port = $savvyext->cfgReadLong('database_replica.port');
$readOnlyReplicaConnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
$readOnlyReplicaConn = pg_connect($readOnlyReplicaConnstr);
$q = "select member_id,count(*) from members_tracking group by member_id";
$r = pg_query($readOnlyReplicaConn, $q);
if ($r && pg_num_rows($r)) {
while ($f=pg_fetch_row($r)) {
$member_id = $f[0];
echo "[".date("Y-m-d H:i:s")."] Processing member_id=".$member_id." (total member records: ".$f[1].")\n";
//if ($f[0]==22) continue; // Crash!
$p = processMemberGPSDataPartition($member_id);
echo "[".date("Y-m-d H:i:s")."] Processed member_id=".$member_id." (processed records: ".$p.")\n";
}
}
function processMemberGPSDataPartition($member_id) {
global $conn,$precision,$hard_limit,$readOnlyReplicaConn;
echo "[".date("Y-m-d H:i:s")."] processMemberGPSDataPartition($member_id)\n";
$id = 0;
$q = 'select max(last_id) from members_tracking_duration where member_id=' . $member_id;
$r = pg_query($readOnlyReplicaConn, $q);
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
$id = (int)$f[0];
}
echo "[".date("Y-m-d H:i:s")."] Last processed ID is ${id}\n";
/*
0 - id
1 - lat
2 - lng
3 - ttime
4 - member_id
5 - device_id
6 - gps
*/
$q = "select id,round(lat,${precision}) as lat,round(lng,${precision}) as lng,ttime,member_id,device_id,gps ";
$q.= " from members_tracking where id>${id} ";
$q.= " and member_id=".$member_id;
$q.= " order by member_id,device_id,ttime limit ${hard_limit}";
echo "\n$q\n";
$r = pg_query($readOnlyReplicaConn, $q);
if ($r && pg_num_rows($r)) {
echo "[".date("Y-m-d H:i:s")."] Records to process: ".pg_num_rows($r)."\n";
} else {
echo "[".date("Y-m-d H:i:s")."] No new records to process. Stop.\n";
return 0;
}
echo "[".date("Y-m-d H:i:s")."] Processing";
$first_data = NULL;
$last_data = NULL;
$pool_data = [];
$processed = 0;
while ($f=pg_fetch_row($r)) {
//echo ".";
if ($last_data!=NULL) {
// member id
if ($last_data[4]!=$f[4] || $last_data[5]!=$f[5] || // device_id
$last_data[1]!=$f[1] || $last_data[2]!=$f[2]) { // gps coordinates
// Process
$clear_data = [];
foreach ($pool_data as $id) {
if ($id!=$first_data[0] && $id!=$last_data[0]) {
$clear_data[] = $id;
}
}
$first = $clear_data[0] ?? $first_data;
$n = count($pool_data);
$last = array_pop($pool_data) ?? $last_data;
$q = "INSERT INTO members_tracking_duration (member_id,lat,lng,gps,first_id,last_id,first_time,last_time,duration,device_id,inner_duration,total) VALUES(";
$q.= $last_data[4].",".$last_data[1].",".$last_data[2].","; // member_id,lat,lng
$q.= "ST_SetSRID(ST_MakePoint(".$last_data[1].",".$last_data[2]."), 4326)::geography"; // GPS
$q.= ",".$first_data[0].",".$last_data[0].",'".$first_data[3]."','".$last_data[3]."',"; // fist_id,last_id,first_time,last_time,duration
$q.= "'".$last_data[3]."'::timestamp - '".$first_data[3]."'::timestamp,".($f[5]>0?$f[5]:"NULL"); // duration, device_id
$q.= "EXTRACT(epoch FROM '".$last[3]."':timestamp-'".$first[3]."'::timestamp),".$n.")"; // inner_duration, total
pg_query($conn, $q);
//echo pg_last_error()."\n";
if (count($clear_data)>0) {
$q1 = "DELETE FROM members_tracking WHERE id IN (".implode(",",$clear_data).")";
$r1 = pg_query($conn, $q1);
if ($r1 && pg_affected_rows($r1)) {
$q2 = "UPDATE members_tracking SET previous_id=".$first_data[0].",";
$q2.= "distance=ST_Distance(gps,'".$first_data[6]."'::geometry),";
$q2.= "duration=EXTRACT(epoch FROM ttime-'".$first_data[3]."'::timestamp)";
$q2.= " WHERE previous_id IN (".implode(",",$clear_data).",".$last_data[0].")";
$r2 = pg_query($conn, $q2);
echo "\n${q2}\n";
//echo pg_last_error()."\n";
} else {
echo "\n";
echo "[".date("Y-m-d H:i:s")."] Failed - ".pg_last_error()."\n";
echo "${q1}\n";
}
}
$first_data = NULL;
}
}
if ($first_data==NULL) {
$first_data = $f;
unset($pool_data);
$pool_data = [];
}
$pool_data[] = $f[0]; // id
$last_data = $f;
$processed++;
}
pg_free_result($r);
echo "\n";
return $processed;
}
pg_close($conn);
pg_close($readOnlyReplicaConn);
echo "[".date("Y-m-d H:i:s")."] GPS member_tracking_duration job complete.\n";