Files
FloatBackOfffice/CRONS/member_tracking_trips.php
dev-chiefworks f76abffdcd first commit
2022-05-31 16:21:53 -04:00

318 lines
13 KiB
PHP

<?php
echo "[".date("Y-m-d H:i:s")."] GPS members_tracking_trips job is starting.\n";
require('../backend.php');
// We will not limit the SQL we will exhaust the allowed memory
// we will process data in ${hard_limit} batches and hopefully catch up
$hard_limit = 500000;
// GPS coordinate precision to assume the same location
$precision = 3;
// How many seconds is a stop?
$stop_limit = 120;
$httpAuthToken = $savvyext->cfgReadChar('system.oauth2_token');
$db_host = $savvyext->cfgReadChar('database_replica.host');
$db_name = $savvyext->cfgReadChar('database_replica.name');
$db_user = $savvyext->cfgReadChar('database_replica.user');
$db_pass = $savvyext->cfgReadChar('database_replica.pass');
$db_port = $savvyext->cfgReadLong('database_replica.port');
$readOnlyReplicaConnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
$readOnlyReplicaConn = pg_connect($readOnlyReplicaConnstr);
$db_host = $savvyext->cfgReadChar('gpsdatabase.host');
$db_name = $savvyext->cfgReadChar('gpsdatabase.name');
$db_user = $savvyext->cfgReadChar('gpsdatabase.user');
$db_pass = $savvyext->cfgReadChar('gpsdatabase.pass');
$db_port = $savvyext->cfgReadLong('gpsdatabase.port');
$gpsconnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
$gpsconn = pg_connect($gpsconnstr);
$q = "select member_id,count(*) from members_tracking ";
//$q.= "where member_id=55 "; // DEBUG
$q.= "group by member_id";
$r = pg_query($gpsconn, $q);
if ($r && pg_num_rows($r)) {
while ($f=pg_fetch_row($r)) {
$member_id = $f[0];
echo "[".date("Y-m-d H:i:s")."] Processing member_id=".$member_id." (total member records: ".$f[1].")\n";
$p = processMemberGPSTrackingTrips($member_id);
echo "[".date("Y-m-d H:i:s")."] Processed member_id=".$member_id." (processed records: ".$p.")\n";
}
}
function processMemberGPSTrackingTrips($member_id) {
global $gpsconn,$precision,$hard_limit,$stop_limit;
echo "[".date("Y-m-d H:i:s")."] processMemberGPSDataPartition($member_id)\n";
$id = 0;
$q = 'select max(location_start),max(location_end) from members_tracking_trips where member_id=' . $member_id;
$r = pg_query($gpsconn, $q);
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
$id = max((int)$f[0],(int)$f[1]);
}
echo "[".date("Y-m-d H:i:s")."] Last processed ID is ${id}\n";
$q = "select a.first_id,a.last_id,a.device_id,extract(milliseconds from a.duration) as ms, ";
$q.= "extract(epoch from a.duration) as seconds, c.speed, a.inner_duration, a.total, ";
$q.= "ST_Distance(c.gps,b.gps) AS distance, b.gps AS gps_start, c.gps AS gps_end,";
$q.= "EXTRACT(epoch FROM c.ttime-b.ttime) AS travel_time ";
$q.= "from members_tracking_duration a ";
$q.= "left join members_tracking b on (b.id=a.first_id) ";
$q.= "left join members_tracking c on (c.id=a.last_id) ";
$q.= "where a.member_id=${member_id} ";
$q.= "and a.first_id>${id} AND a.last_id>${id} "; // Limit ?
$q.= "order by a.device_id, a.first_time limit ${hard_limit}";
//echo "\n$q\n";
$r = pg_query($gpsconn, $q);
if ($r && pg_num_rows($r)) {
echo "[".date("Y-m-d H:i:s")."] Records to process: ".pg_num_rows($r)."\n";
} else {
echo "[".date("Y-m-d H:i:s")."] No new records to process. Stop.\n";
return 0;
}
echo "[".date("Y-m-d H:i:s")."] Processing";
$trip = [];
$trips = [];
$trip_data = ['distance'=>0,'duration'=>0,'total'=>0,'speed'=>0];
$trips_data = [];
$processed = 0;
$last_device_id = -1;
$r = pg_query($gpsconn, $q);
while ($f=pg_fetch_assoc($r)) {
//$res[] = $f;
//if ($f["seconds"]>=$stop_limit || ($last_device_id!=$f["device_id"] && count($trip)>0)) {
if ($f["inner_duration"]>=$stop_limit || ($last_device_id!=$f["device_id"] && count($trip)>0)) {
$trips[] = $trip;
$trips_data[] = $trip_data;
$n = count($trip)-1;
list ($res, $err) = processMemberGPSTrackingTripsSave(
$member_id,
$trip[0],
array_pop($trip),
$trip_data);
if ($res && $res["id"]>0) {
echo "[".date("Y-m-d H:i:s")."] New trip processed: ".$res["id"]." (member_id=${member_id})\n";
} else {
echo "[".date("Y-m-d H:i:s")."] Failed to process trip: ${err} (member_id=${member_id})\n";
var_dump($trip_data); // DEBUG
}
unset($trip);
unset($trip_data);
$trip = [];
$trip_data = ['distance'=>0,'duration'=>0,'total'=>0,'speed'=>0];
} else {
$trip[] = $f;
$trip_data['distance'] += $f["distance"];
$trip_data['duration'] += $f["ms"];
$trip_data['speed'] += $f["ms"]>0 ? (3600 * $f["distance"] / $f["ms"]) : 0; // km/h
$trip_data['total'] += 1;
}
$last_device_id = $f["device_id"];
$processed++;
}
pg_free_result($r);
return $processed;
}
function processMemberGPSTrackingTripsSave($member_id,$first,$last,$trip_data) {
global $gpsconn;
$distance = $trip_data['distance'];
$duration = $trip_data['duration']/1000;
if ($distance>0 && $duration>0) {
// OK
} else {
return [NULL, "Distance and/or duration are invalid!"];
}
$speed = 3600 * $trip_data['distance'] / $trip_data['duration'];
$avg_speed = $trip_data['speed'] / $trip_data['total'];
$device_id = ($first['device_id']>0 ? $first['device_id'] : "NULL");
$location_start = $first['last_id'];
$location_end = $last['last_id'];
$gps_start = $first['gps_end'];
$gps_end = $last['gps_end'];
$address_start = processMemberGPSTrackingTripsReverseGeocode($gps_start);
$address_end = processMemberGPSTrackingTripsReverseGeocode($gps_end);
$q = "SELECT * FROM members_tracking_trips WHERE member_id=${member_id} ";
$q.= "AND location_start=${location_start} AND location_end=${location_end} ";
$q.= "AND ".($device_id=="NULL"?"device_id IS NULL":"device_id=${device_id}");
$r = pg_query($gpsconn, $q);
if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) {
$q = "UPDATE members_tracking_trips SET distance=${distance},duration=${duration},";
$q.= "speed=${speed},avg_speed=${avg_speed},gps_start='${gps_start}',gps_end='${gps_end}',";
$q.= "address_start='${address_start}',address_end='${address_end}' WHERE member_id=${member_id} ";
$q.= "AND location_start=${location_start} AND location_end=${location_end} ";
$q.= "AND ".($device_id=="NULL"?"device_id IS NULL":"device_id=${device_id}");
$q.=" RETURNING *";
} else {
$q = "INSERT INTO members_tracking_trips (member_id,distance,duration,speed,avg_speed,location_start,location_end,device_id,gps_start,gps_end,address_start,address_end) VALUES(";
$q.= "${member_id},${distance},${duration},${speed},${avg_speed},${location_start},${location_end},${device_id},'${gps_start}','${gps_end}',${address_start},${address_end}";
$q.= ") RETURNING *";
}
$r = pg_query($gpsconn, $q);
if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) {
pg_free_result($r);
return [$f, NULL];
}
echo "\n$q\n";
return [NULL, pg_last_error($gpsconn)];
}
function processMemberGPSTrackingTripsReverseGeocode($gps) {
global $gpsconn, $readOnlyReplicaConn;
if ($gps==NULL || $gps=="") {
echo "[".date("Y-m-d H:i:s")."] Empty GPS string\n";
return "NULL";
}
$q = "SELECT ST_Y('${gps}') AS lat, ST_X('${gps}') AS lng";
$r = pg_query($gpsconn, $q);
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
$lat = $f[0];
$lng = $f[1];
} else {
echo "[".date("Y-m-d H:i:s")."] Failed to decode GPS: '${gps}'\n";
return "NULL";
}
list ($address,$err) = reverseGeocode($readOnlyReplicaConn, $lat, $lng);
if (!$address && $err) {
echo "[".date("Y-m-d H:i:s")."] Failed to reverse geocode GPS: ${lat},${lng}\n";
return "NULL";
}
$q = "SELECT id FROM address WHERE lower(address)=lower('".pg_escape_string($address)."') ORDER BY geocoding_date DESC LIMIT 1";
//echo "\n\n${q}\n\n";
$r = pg_query($readOnlyReplicaConn, $q);
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
return $f[0];
}
echo "[".date("Y-m-d H:i:s")."] Failed to load address: '${address}'\n";
return "NULL";
}
function reverseGeocode($db, $lat, $lng) {
// Reverse geocode using local DB
list($res,$err) = GeocodeReverseGeocode($db, $lat, $lng);
if (is_array($res) && isset($res["address"])) {
return [$res["address"],NULL];
}
// Reverse geocode service using Google Maps API
list($res,$err) = reverseGeocodeService($db, $lat, $lng);
if (is_array($res) && isset($res["address"]) && $res["address"]!="") {
reverseGeocodeServiceSave($db, $res);
return [$res["address"],NULL];
}
return [NULL, $err];
}
function GeocodeReverseGeocode($db, $lat, $lng) {
$db_lat = ((float)$lat);
$db_lng = ((float)$lng);
// Check our local address table with precisions 5 to 3
for ($i=5;$i>2;$i--) {
$q = "SELECT * FROM address WHERE round(latitude,${i})=round(${db_lat},${i}) AND round(longitude,${i})=round(${db_lng},${i})";
$r = pg_query($db, $q);
if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) {
return [$f, NULL];
}
}
// Check our local Singapore table with precisions 5 to 3
/* for ($i=5;$i>2;$i--) {
$q = "SELECT * FROM singapore_buildings WHERE round(latitude,${i})=round(${db_lat},${i}) AND round(longitude,${i})=round(${db_lng},${i})";
$r = pg_query($q);
if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) {
return [$f, NULL];
}
} //*/
return [NULL, pg_last_error($db)];
}
function reverseGeocodeServiceSave($db, $result) {
$db_lat = floatval($result["lat"]);
$db_lng = floatval($result["lng"]);
$db_address = pg_escape_string($result["address"]);
$db_postal = pg_escape_string($result["postal"]);
$db_tz = GeocodeGetTimezone($db, $result["timeZoneId"]);
// Check the address exists
$q = "SELECT id FROM address WHERE lower(address)=lower('${db_address}') ";
$q.= "AND latitude<>0 AND longitude<>0 AND geocoding_date IS NOT NULL ";
$q.= "ORDER BY geocoding_date DESC LIMIT 1";
//error_log($q);
$r = pg_query($db, $q);
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
return $f[0];
}
$q = "INSERT INTO address (address,latitude,longitude,timezone,geocoding_date,postal) VALUES (";
$q.= "'${db_address}',${db_lat},${db_lng},".($db_tz==NULL?"NULL":$db_tz).",now(),'${db_postal}')";
$q.= " RETURNING id";
$r = pg_query($db, $q);
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
return $f[0];
}
return NULL;
}
function GeocodeGetTimezone($db, $timezone) {
$db_timezone = pg_escape_string($timezone);
$q = "SELECT id FROM address_timezone WHERE timezone='${db_timezone}'";
$r = pg_query($db, $q);
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
return $f[0];
}
$q = "INSERT INTO address_timezone (timezone) VALUES('${db_timezone}') RETURNING id";
$r = pg_query($db, $q);
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
return $f[0];
}
return NULL;
}
function reverseGeocodeService($db, $lat, $lng) {
global $httpAuthToken;
$data = http_build_query(
array(
'lat' => $lat,
'lng' => $lng
)
);
$url = "http://oauth2.service/api/v1/reverse?" . $data;
//echo "\n\n${url}\n\n";
$opts = array(
'http' => array(
'method' => "GET",
'timeout' => 60, /* 1 minute */
'header' =>
"Content-Type: application/x-www-form-urlencoded\r\n" .
"Accept: application/json\r\n" .
"Authorization: Server-Token ${httpAuthToken}\r\n",
)
);
$context = stream_context_create($opts);
$body = file_get_contents($url, false, $context);
//error_log("BODY: ".$body);
$geocoded = json_decode($body,true);
if (is_array($geocoded) && is_array($geocoded["data"]) && !isset($geocoded["error"])) {
//error_log(json_encode($geocoded));
// Cache the result in DB
/*Geocode::saveDistanceCache(
$db->getConnect(), $fromLat, $fromLng, $toLat, $toLng, $geocoded["data"]);
*/
return [$geocoded["data"], NULL];
} else if (is_array($geocoded) && isset($geocoded["error"])) {
$body = $geocoded["error"];
}
return [NULL, "Reverse geocoding service call error: ".$body];
}
pg_close($readOnlyReplicaConn);
pg_close($gpsconn);
echo "[".date("Y-m-d H:i:s")."] GPS members_tracking_trips job complete.\n";