318 lines
13 KiB
PHP
318 lines
13 KiB
PHP
<?php
|
|
|
|
echo "[".date("Y-m-d H:i:s")."] GPS members_tracking_trips job is starting.\n";
|
|
|
|
require('../backend.php');
|
|
|
|
// We will not limit the SQL we will exhaust the allowed memory
|
|
// we will process data in ${hard_limit} batches and hopefully catch up
|
|
$hard_limit = 500000;
|
|
// GPS coordinate precision to assume the same location
|
|
$precision = 3;
|
|
// How many seconds is a stop?
|
|
$stop_limit = 120;
|
|
|
|
$httpAuthToken = $savvyext->cfgReadChar('system.oauth2_token');
|
|
|
|
$db_host = $savvyext->cfgReadChar('database_replica.host');
|
|
$db_name = $savvyext->cfgReadChar('database_replica.name');
|
|
$db_user = $savvyext->cfgReadChar('database_replica.user');
|
|
$db_pass = $savvyext->cfgReadChar('database_replica.pass');
|
|
$db_port = $savvyext->cfgReadLong('database_replica.port');
|
|
$readOnlyReplicaConnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
|
|
$readOnlyReplicaConn = pg_connect($readOnlyReplicaConnstr);
|
|
|
|
$db_host = $savvyext->cfgReadChar('gpsdatabase.host');
|
|
$db_name = $savvyext->cfgReadChar('gpsdatabase.name');
|
|
$db_user = $savvyext->cfgReadChar('gpsdatabase.user');
|
|
$db_pass = $savvyext->cfgReadChar('gpsdatabase.pass');
|
|
$db_port = $savvyext->cfgReadLong('gpsdatabase.port');
|
|
$gpsconnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
|
|
$gpsconn = pg_connect($gpsconnstr);
|
|
|
|
$q = "select member_id,count(*) from members_tracking ";
|
|
//$q.= "where member_id=55 "; // DEBUG
|
|
$q.= "group by member_id";
|
|
$r = pg_query($gpsconn, $q);
|
|
if ($r && pg_num_rows($r)) {
|
|
while ($f=pg_fetch_row($r)) {
|
|
$member_id = $f[0];
|
|
echo "[".date("Y-m-d H:i:s")."] Processing member_id=".$member_id." (total member records: ".$f[1].")\n";
|
|
$p = processMemberGPSTrackingTrips($member_id);
|
|
echo "[".date("Y-m-d H:i:s")."] Processed member_id=".$member_id." (processed records: ".$p.")\n";
|
|
}
|
|
}
|
|
|
|
function processMemberGPSTrackingTrips($member_id) {
|
|
global $gpsconn,$precision,$hard_limit,$stop_limit;
|
|
echo "[".date("Y-m-d H:i:s")."] processMemberGPSDataPartition($member_id)\n";
|
|
$id = 0;
|
|
$q = 'select max(location_start),max(location_end) from members_tracking_trips where member_id=' . $member_id;
|
|
$r = pg_query($gpsconn, $q);
|
|
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
|
$id = max((int)$f[0],(int)$f[1]);
|
|
}
|
|
echo "[".date("Y-m-d H:i:s")."] Last processed ID is ${id}\n";
|
|
|
|
$q = "select a.first_id,a.last_id,a.device_id,extract(milliseconds from a.duration) as ms, ";
|
|
$q.= "extract(epoch from a.duration) as seconds, c.speed, a.inner_duration, a.total, ";
|
|
$q.= "ST_Distance(c.gps,b.gps) AS distance, b.gps AS gps_start, c.gps AS gps_end,";
|
|
$q.= "EXTRACT(epoch FROM c.ttime-b.ttime) AS travel_time ";
|
|
$q.= "from members_tracking_duration a ";
|
|
$q.= "left join members_tracking b on (b.id=a.first_id) ";
|
|
$q.= "left join members_tracking c on (c.id=a.last_id) ";
|
|
$q.= "where a.member_id=${member_id} ";
|
|
$q.= "and a.first_id>${id} AND a.last_id>${id} "; // Limit ?
|
|
$q.= "order by a.device_id, a.first_time limit ${hard_limit}";
|
|
//echo "\n$q\n";
|
|
$r = pg_query($gpsconn, $q);
|
|
|
|
if ($r && pg_num_rows($r)) {
|
|
echo "[".date("Y-m-d H:i:s")."] Records to process: ".pg_num_rows($r)."\n";
|
|
} else {
|
|
echo "[".date("Y-m-d H:i:s")."] No new records to process. Stop.\n";
|
|
return 0;
|
|
}
|
|
|
|
echo "[".date("Y-m-d H:i:s")."] Processing";
|
|
|
|
$trip = [];
|
|
$trips = [];
|
|
$trip_data = ['distance'=>0,'duration'=>0,'total'=>0,'speed'=>0];
|
|
$trips_data = [];
|
|
$processed = 0;
|
|
$last_device_id = -1;
|
|
$r = pg_query($gpsconn, $q);
|
|
while ($f=pg_fetch_assoc($r)) {
|
|
//$res[] = $f;
|
|
//if ($f["seconds"]>=$stop_limit || ($last_device_id!=$f["device_id"] && count($trip)>0)) {
|
|
if ($f["inner_duration"]>=$stop_limit || ($last_device_id!=$f["device_id"] && count($trip)>0)) {
|
|
$trips[] = $trip;
|
|
$trips_data[] = $trip_data;
|
|
$n = count($trip)-1;
|
|
list ($res, $err) = processMemberGPSTrackingTripsSave(
|
|
$member_id,
|
|
$trip[0],
|
|
array_pop($trip),
|
|
$trip_data);
|
|
if ($res && $res["id"]>0) {
|
|
echo "[".date("Y-m-d H:i:s")."] New trip processed: ".$res["id"]." (member_id=${member_id})\n";
|
|
} else {
|
|
echo "[".date("Y-m-d H:i:s")."] Failed to process trip: ${err} (member_id=${member_id})\n";
|
|
var_dump($trip_data); // DEBUG
|
|
}
|
|
unset($trip);
|
|
unset($trip_data);
|
|
$trip = [];
|
|
$trip_data = ['distance'=>0,'duration'=>0,'total'=>0,'speed'=>0];
|
|
} else {
|
|
$trip[] = $f;
|
|
$trip_data['distance'] += $f["distance"];
|
|
$trip_data['duration'] += $f["ms"];
|
|
$trip_data['speed'] += $f["ms"]>0 ? (3600 * $f["distance"] / $f["ms"]) : 0; // km/h
|
|
$trip_data['total'] += 1;
|
|
}
|
|
$last_device_id = $f["device_id"];
|
|
$processed++;
|
|
}
|
|
pg_free_result($r);
|
|
return $processed;
|
|
}
|
|
|
|
function processMemberGPSTrackingTripsSave($member_id,$first,$last,$trip_data) {
|
|
global $gpsconn;
|
|
|
|
$distance = $trip_data['distance'];
|
|
$duration = $trip_data['duration']/1000;
|
|
|
|
if ($distance>0 && $duration>0) {
|
|
// OK
|
|
} else {
|
|
return [NULL, "Distance and/or duration are invalid!"];
|
|
}
|
|
|
|
$speed = 3600 * $trip_data['distance'] / $trip_data['duration'];
|
|
$avg_speed = $trip_data['speed'] / $trip_data['total'];
|
|
$device_id = ($first['device_id']>0 ? $first['device_id'] : "NULL");
|
|
|
|
$location_start = $first['last_id'];
|
|
$location_end = $last['last_id'];
|
|
$gps_start = $first['gps_end'];
|
|
$gps_end = $last['gps_end'];
|
|
|
|
$address_start = processMemberGPSTrackingTripsReverseGeocode($gps_start);
|
|
$address_end = processMemberGPSTrackingTripsReverseGeocode($gps_end);
|
|
|
|
$q = "SELECT * FROM members_tracking_trips WHERE member_id=${member_id} ";
|
|
$q.= "AND location_start=${location_start} AND location_end=${location_end} ";
|
|
$q.= "AND ".($device_id=="NULL"?"device_id IS NULL":"device_id=${device_id}");
|
|
$r = pg_query($gpsconn, $q);
|
|
if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) {
|
|
$q = "UPDATE members_tracking_trips SET distance=${distance},duration=${duration},";
|
|
$q.= "speed=${speed},avg_speed=${avg_speed},gps_start='${gps_start}',gps_end='${gps_end}',";
|
|
$q.= "address_start='${address_start}',address_end='${address_end}' WHERE member_id=${member_id} ";
|
|
$q.= "AND location_start=${location_start} AND location_end=${location_end} ";
|
|
$q.= "AND ".($device_id=="NULL"?"device_id IS NULL":"device_id=${device_id}");
|
|
$q.=" RETURNING *";
|
|
} else {
|
|
$q = "INSERT INTO members_tracking_trips (member_id,distance,duration,speed,avg_speed,location_start,location_end,device_id,gps_start,gps_end,address_start,address_end) VALUES(";
|
|
$q.= "${member_id},${distance},${duration},${speed},${avg_speed},${location_start},${location_end},${device_id},'${gps_start}','${gps_end}',${address_start},${address_end}";
|
|
$q.= ") RETURNING *";
|
|
}
|
|
$r = pg_query($gpsconn, $q);
|
|
if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) {
|
|
pg_free_result($r);
|
|
return [$f, NULL];
|
|
}
|
|
echo "\n$q\n";
|
|
return [NULL, pg_last_error($gpsconn)];
|
|
}
|
|
|
|
function processMemberGPSTrackingTripsReverseGeocode($gps) {
|
|
global $gpsconn, $readOnlyReplicaConn;
|
|
if ($gps==NULL || $gps=="") {
|
|
echo "[".date("Y-m-d H:i:s")."] Empty GPS string\n";
|
|
return "NULL";
|
|
}
|
|
$q = "SELECT ST_Y('${gps}') AS lat, ST_X('${gps}') AS lng";
|
|
$r = pg_query($gpsconn, $q);
|
|
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
|
$lat = $f[0];
|
|
$lng = $f[1];
|
|
} else {
|
|
echo "[".date("Y-m-d H:i:s")."] Failed to decode GPS: '${gps}'\n";
|
|
return "NULL";
|
|
}
|
|
list ($address,$err) = reverseGeocode($readOnlyReplicaConn, $lat, $lng);
|
|
if (!$address && $err) {
|
|
echo "[".date("Y-m-d H:i:s")."] Failed to reverse geocode GPS: ${lat},${lng}\n";
|
|
return "NULL";
|
|
}
|
|
$q = "SELECT id FROM address WHERE lower(address)=lower('".pg_escape_string($address)."') ORDER BY geocoding_date DESC LIMIT 1";
|
|
//echo "\n\n${q}\n\n";
|
|
$r = pg_query($readOnlyReplicaConn, $q);
|
|
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
|
return $f[0];
|
|
}
|
|
echo "[".date("Y-m-d H:i:s")."] Failed to load address: '${address}'\n";
|
|
return "NULL";
|
|
}
|
|
|
|
function reverseGeocode($db, $lat, $lng) {
|
|
// Reverse geocode using local DB
|
|
list($res,$err) = GeocodeReverseGeocode($db, $lat, $lng);
|
|
if (is_array($res) && isset($res["address"])) {
|
|
return [$res["address"],NULL];
|
|
}
|
|
// Reverse geocode service using Google Maps API
|
|
list($res,$err) = reverseGeocodeService($db, $lat, $lng);
|
|
if (is_array($res) && isset($res["address"]) && $res["address"]!="") {
|
|
reverseGeocodeServiceSave($db, $res);
|
|
return [$res["address"],NULL];
|
|
}
|
|
return [NULL, $err];
|
|
}
|
|
|
|
function GeocodeReverseGeocode($db, $lat, $lng) {
|
|
$db_lat = ((float)$lat);
|
|
$db_lng = ((float)$lng);
|
|
// Check our local address table with precisions 5 to 3
|
|
for ($i=5;$i>2;$i--) {
|
|
$q = "SELECT * FROM address WHERE round(latitude,${i})=round(${db_lat},${i}) AND round(longitude,${i})=round(${db_lng},${i})";
|
|
$r = pg_query($db, $q);
|
|
if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) {
|
|
return [$f, NULL];
|
|
}
|
|
}
|
|
// Check our local Singapore table with precisions 5 to 3
|
|
/* for ($i=5;$i>2;$i--) {
|
|
$q = "SELECT * FROM singapore_buildings WHERE round(latitude,${i})=round(${db_lat},${i}) AND round(longitude,${i})=round(${db_lng},${i})";
|
|
$r = pg_query($q);
|
|
if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) {
|
|
return [$f, NULL];
|
|
}
|
|
} //*/
|
|
return [NULL, pg_last_error($db)];
|
|
}
|
|
|
|
function reverseGeocodeServiceSave($db, $result) {
|
|
$db_lat = floatval($result["lat"]);
|
|
$db_lng = floatval($result["lng"]);
|
|
$db_address = pg_escape_string($result["address"]);
|
|
$db_postal = pg_escape_string($result["postal"]);
|
|
$db_tz = GeocodeGetTimezone($db, $result["timeZoneId"]);
|
|
// Check the address exists
|
|
$q = "SELECT id FROM address WHERE lower(address)=lower('${db_address}') ";
|
|
$q.= "AND latitude<>0 AND longitude<>0 AND geocoding_date IS NOT NULL ";
|
|
$q.= "ORDER BY geocoding_date DESC LIMIT 1";
|
|
//error_log($q);
|
|
$r = pg_query($db, $q);
|
|
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
|
return $f[0];
|
|
}
|
|
$q = "INSERT INTO address (address,latitude,longitude,timezone,geocoding_date,postal) VALUES (";
|
|
$q.= "'${db_address}',${db_lat},${db_lng},".($db_tz==NULL?"NULL":$db_tz).",now(),'${db_postal}')";
|
|
$q.= " RETURNING id";
|
|
$r = pg_query($db, $q);
|
|
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
|
return $f[0];
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
function GeocodeGetTimezone($db, $timezone) {
|
|
$db_timezone = pg_escape_string($timezone);
|
|
$q = "SELECT id FROM address_timezone WHERE timezone='${db_timezone}'";
|
|
$r = pg_query($db, $q);
|
|
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
|
return $f[0];
|
|
}
|
|
$q = "INSERT INTO address_timezone (timezone) VALUES('${db_timezone}') RETURNING id";
|
|
$r = pg_query($db, $q);
|
|
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
|
return $f[0];
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
function reverseGeocodeService($db, $lat, $lng) {
|
|
global $httpAuthToken;
|
|
$data = http_build_query(
|
|
array(
|
|
'lat' => $lat,
|
|
'lng' => $lng
|
|
)
|
|
);
|
|
$url = "http://oauth2.service/api/v1/reverse?" . $data;
|
|
//echo "\n\n${url}\n\n";
|
|
$opts = array(
|
|
'http' => array(
|
|
'method' => "GET",
|
|
'timeout' => 60, /* 1 minute */
|
|
'header' =>
|
|
"Content-Type: application/x-www-form-urlencoded\r\n" .
|
|
"Accept: application/json\r\n" .
|
|
"Authorization: Server-Token ${httpAuthToken}\r\n",
|
|
)
|
|
);
|
|
$context = stream_context_create($opts);
|
|
$body = file_get_contents($url, false, $context);
|
|
//error_log("BODY: ".$body);
|
|
$geocoded = json_decode($body,true);
|
|
if (is_array($geocoded) && is_array($geocoded["data"]) && !isset($geocoded["error"])) {
|
|
//error_log(json_encode($geocoded));
|
|
// Cache the result in DB
|
|
/*Geocode::saveDistanceCache(
|
|
$db->getConnect(), $fromLat, $fromLng, $toLat, $toLng, $geocoded["data"]);
|
|
*/
|
|
return [$geocoded["data"], NULL];
|
|
} else if (is_array($geocoded) && isset($geocoded["error"])) {
|
|
$body = $geocoded["error"];
|
|
}
|
|
return [NULL, "Reverse geocoding service call error: ".$body];
|
|
}
|
|
|
|
pg_close($readOnlyReplicaConn);
|
|
pg_close($gpsconn);
|
|
echo "[".date("Y-m-d H:i:s")."] GPS members_tracking_trips job complete.\n";
|