141 lines
6.0 KiB
PHP
141 lines
6.0 KiB
PHP
<?php
|
|
|
|
echo "[".date("Y-m-d H:i:s")."] GPS member_tracking_duration job is starting.\n";
|
|
|
|
require('../backend.php');
|
|
|
|
// We will not limit the SQL we will exhaust the allowed memory
|
|
// we will process data in ${hard_limit} batches and hopefully catch up
|
|
$hard_limit = 500000;
|
|
// GPS coordinate precision to assume the same location
|
|
$precision = 3;
|
|
|
|
$db_host = $savvyext->cfgReadChar('gpsdatabase.host');
|
|
$db_name = $savvyext->cfgReadChar('gpsdatabase.name');
|
|
$db_user = $savvyext->cfgReadChar('gpsdatabase.user');
|
|
$db_pass = $savvyext->cfgReadChar('gpsdatabase.pass');
|
|
$db_port = $savvyext->cfgReadLong('gpsdatabase.port');
|
|
$connstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
|
|
$conn = pg_connect($connstr);
|
|
|
|
$db_host = $savvyext->cfgReadChar('database_replica.host');
|
|
$db_name = $savvyext->cfgReadChar('database_replica.name');
|
|
$db_user = $savvyext->cfgReadChar('database_replica.user');
|
|
$db_pass = $savvyext->cfgReadChar('database_replica.pass');
|
|
$db_port = $savvyext->cfgReadLong('database_replica.port');
|
|
$readOnlyReplicaConnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
|
|
$readOnlyReplicaConn = pg_connect($readOnlyReplicaConnstr);
|
|
|
|
$q = "select member_id,count(*) from members_tracking group by member_id";
|
|
$r = pg_query($readOnlyReplicaConn, $q);
|
|
if ($r && pg_num_rows($r)) {
|
|
while ($f=pg_fetch_row($r)) {
|
|
$member_id = $f[0];
|
|
echo "[".date("Y-m-d H:i:s")."] Processing member_id=".$member_id." (total member records: ".$f[1].")\n";
|
|
//if ($f[0]==22) continue; // Crash!
|
|
$p = processMemberGPSDataPartition($member_id);
|
|
echo "[".date("Y-m-d H:i:s")."] Processed member_id=".$member_id." (processed records: ".$p.")\n";
|
|
}
|
|
}
|
|
|
|
function processMemberGPSDataPartition($member_id) {
|
|
global $conn,$precision,$hard_limit,$readOnlyReplicaConn;
|
|
echo "[".date("Y-m-d H:i:s")."] processMemberGPSDataPartition($member_id)\n";
|
|
$id = 0;
|
|
$q = 'select max(last_id) from members_tracking_duration where member_id=' . $member_id;
|
|
$r = pg_query($readOnlyReplicaConn, $q);
|
|
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
|
$id = (int)$f[0];
|
|
}
|
|
echo "[".date("Y-m-d H:i:s")."] Last processed ID is ${id}\n";
|
|
/*
|
|
0 - id
|
|
1 - lat
|
|
2 - lng
|
|
3 - ttime
|
|
4 - member_id
|
|
5 - device_id
|
|
6 - gps
|
|
*/
|
|
$q = "select id,round(lat,${precision}) as lat,round(lng,${precision}) as lng,ttime,member_id,device_id,gps ";
|
|
$q.= " from members_tracking where id>${id} ";
|
|
$q.= " and member_id=".$member_id;
|
|
$q.= " order by member_id,device_id,ttime limit ${hard_limit}";
|
|
echo "\n$q\n";
|
|
$r = pg_query($readOnlyReplicaConn, $q);
|
|
|
|
if ($r && pg_num_rows($r)) {
|
|
echo "[".date("Y-m-d H:i:s")."] Records to process: ".pg_num_rows($r)."\n";
|
|
} else {
|
|
echo "[".date("Y-m-d H:i:s")."] No new records to process. Stop.\n";
|
|
return 0;
|
|
}
|
|
|
|
echo "[".date("Y-m-d H:i:s")."] Processing";
|
|
$first_data = NULL;
|
|
$last_data = NULL;
|
|
$pool_data = [];
|
|
$processed = 0;
|
|
while ($f=pg_fetch_row($r)) {
|
|
//echo ".";
|
|
if ($last_data!=NULL) {
|
|
// member id
|
|
if ($last_data[4]!=$f[4] || $last_data[5]!=$f[5] || // device_id
|
|
$last_data[1]!=$f[1] || $last_data[2]!=$f[2]) { // gps coordinates
|
|
// Process
|
|
$clear_data = [];
|
|
foreach ($pool_data as $id) {
|
|
if ($id!=$first_data[0] && $id!=$last_data[0]) {
|
|
$clear_data[] = $id;
|
|
}
|
|
}
|
|
$first = $clear_data[0] ?? $first_data;
|
|
$n = count($pool_data);
|
|
$last = array_pop($pool_data) ?? $last_data;
|
|
$q = "INSERT INTO members_tracking_duration (member_id,lat,lng,gps,first_id,last_id,first_time,last_time,duration,device_id,inner_duration,total) VALUES(";
|
|
$q.= $last_data[4].",".$last_data[1].",".$last_data[2].","; // member_id,lat,lng
|
|
$q.= "ST_SetSRID(ST_MakePoint(".$last_data[1].",".$last_data[2]."), 4326)::geography"; // GPS
|
|
$q.= ",".$first_data[0].",".$last_data[0].",'".$first_data[3]."','".$last_data[3]."',"; // fist_id,last_id,first_time,last_time,duration
|
|
$q.= "'".$last_data[3]."'::timestamp - '".$first_data[3]."'::timestamp,".($f[5]>0?$f[5]:"NULL"); // duration, device_id
|
|
$q.= "EXTRACT(epoch FROM '".$last[3]."':timestamp-'".$first[3]."'::timestamp),".$n.")"; // inner_duration, total
|
|
pg_query($conn, $q);
|
|
//echo pg_last_error()."\n";
|
|
|
|
if (count($clear_data)>0) {
|
|
$q1 = "DELETE FROM members_tracking WHERE id IN (".implode(",",$clear_data).")";
|
|
$r1 = pg_query($conn, $q1);
|
|
if ($r1 && pg_affected_rows($r1)) {
|
|
$q2 = "UPDATE members_tracking SET previous_id=".$first_data[0].",";
|
|
$q2.= "distance=ST_Distance(gps,'".$first_data[6]."'::geometry),";
|
|
$q2.= "duration=EXTRACT(epoch FROM ttime-'".$first_data[3]."'::timestamp)";
|
|
$q2.= " WHERE previous_id IN (".implode(",",$clear_data).",".$last_data[0].")";
|
|
$r2 = pg_query($conn, $q2);
|
|
echo "\n${q2}\n";
|
|
//echo pg_last_error()."\n";
|
|
} else {
|
|
echo "\n";
|
|
echo "[".date("Y-m-d H:i:s")."] Failed - ".pg_last_error()."\n";
|
|
echo "${q1}\n";
|
|
}
|
|
}
|
|
$first_data = NULL;
|
|
}
|
|
}
|
|
if ($first_data==NULL) {
|
|
$first_data = $f;
|
|
unset($pool_data);
|
|
$pool_data = [];
|
|
}
|
|
$pool_data[] = $f[0]; // id
|
|
$last_data = $f;
|
|
$processed++;
|
|
}
|
|
pg_free_result($r);
|
|
echo "\n";
|
|
return $processed;
|
|
}
|
|
|
|
pg_close($conn);
|
|
pg_close($readOnlyReplicaConn);
|
|
echo "[".date("Y-m-d H:i:s")."] GPS member_tracking_duration job complete.\n";
|