first commit
This commit is contained in:
@@ -0,0 +1,317 @@
|
||||
<?php
|
||||
|
||||
echo "[".date("Y-m-d H:i:s")."] GPS members_tracking_trips job is starting.\n";
|
||||
|
||||
require('../backend.php');
|
||||
|
||||
// We will not limit the SQL we will exhaust the allowed memory
|
||||
// we will process data in ${hard_limit} batches and hopefully catch up
|
||||
$hard_limit = 500000;
|
||||
// GPS coordinate precision to assume the same location
|
||||
$precision = 3;
|
||||
// How many seconds is a stop?
|
||||
$stop_limit = 120;
|
||||
|
||||
$httpAuthToken = $savvyext->cfgReadChar('system.oauth2_token');
|
||||
|
||||
$db_host = $savvyext->cfgReadChar('database_replica.host');
|
||||
$db_name = $savvyext->cfgReadChar('database_replica.name');
|
||||
$db_user = $savvyext->cfgReadChar('database_replica.user');
|
||||
$db_pass = $savvyext->cfgReadChar('database_replica.pass');
|
||||
$db_port = $savvyext->cfgReadLong('database_replica.port');
|
||||
$readOnlyReplicaConnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
|
||||
$readOnlyReplicaConn = pg_connect($readOnlyReplicaConnstr);
|
||||
|
||||
$db_host = $savvyext->cfgReadChar('gpsdatabase.host');
|
||||
$db_name = $savvyext->cfgReadChar('gpsdatabase.name');
|
||||
$db_user = $savvyext->cfgReadChar('gpsdatabase.user');
|
||||
$db_pass = $savvyext->cfgReadChar('gpsdatabase.pass');
|
||||
$db_port = $savvyext->cfgReadLong('gpsdatabase.port');
|
||||
$gpsconnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
|
||||
$gpsconn = pg_connect($gpsconnstr);
|
||||
|
||||
$q = "select member_id,count(*) from members_tracking ";
|
||||
//$q.= "where member_id=55 "; // DEBUG
|
||||
$q.= "group by member_id";
|
||||
$r = pg_query($gpsconn, $q);
|
||||
if ($r && pg_num_rows($r)) {
|
||||
while ($f=pg_fetch_row($r)) {
|
||||
$member_id = $f[0];
|
||||
echo "[".date("Y-m-d H:i:s")."] Processing member_id=".$member_id." (total member records: ".$f[1].")\n";
|
||||
$p = processMemberGPSTrackingTrips($member_id);
|
||||
echo "[".date("Y-m-d H:i:s")."] Processed member_id=".$member_id." (processed records: ".$p.")\n";
|
||||
}
|
||||
}
|
||||
|
||||
function processMemberGPSTrackingTrips($member_id) {
|
||||
global $gpsconn,$precision,$hard_limit,$stop_limit;
|
||||
echo "[".date("Y-m-d H:i:s")."] processMemberGPSDataPartition($member_id)\n";
|
||||
$id = 0;
|
||||
$q = 'select max(location_start),max(location_end) from members_tracking_trips where member_id=' . $member_id;
|
||||
$r = pg_query($gpsconn, $q);
|
||||
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
||||
$id = max((int)$f[0],(int)$f[1]);
|
||||
}
|
||||
echo "[".date("Y-m-d H:i:s")."] Last processed ID is ${id}\n";
|
||||
|
||||
$q = "select a.first_id,a.last_id,a.device_id,extract(milliseconds from a.duration) as ms, ";
|
||||
$q.= "extract(epoch from a.duration) as seconds, c.speed, a.inner_duration, a.total, ";
|
||||
$q.= "ST_Distance(c.gps,b.gps) AS distance, b.gps AS gps_start, c.gps AS gps_end,";
|
||||
$q.= "EXTRACT(epoch FROM c.ttime-b.ttime) AS travel_time ";
|
||||
$q.= "from members_tracking_duration a ";
|
||||
$q.= "left join members_tracking b on (b.id=a.first_id) ";
|
||||
$q.= "left join members_tracking c on (c.id=a.last_id) ";
|
||||
$q.= "where a.member_id=${member_id} ";
|
||||
$q.= "and a.first_id>${id} AND a.last_id>${id} "; // Limit ?
|
||||
$q.= "order by a.device_id, a.first_time limit ${hard_limit}";
|
||||
//echo "\n$q\n";
|
||||
$r = pg_query($gpsconn, $q);
|
||||
|
||||
if ($r && pg_num_rows($r)) {
|
||||
echo "[".date("Y-m-d H:i:s")."] Records to process: ".pg_num_rows($r)."\n";
|
||||
} else {
|
||||
echo "[".date("Y-m-d H:i:s")."] No new records to process. Stop.\n";
|
||||
return 0;
|
||||
}
|
||||
|
||||
echo "[".date("Y-m-d H:i:s")."] Processing";
|
||||
|
||||
$trip = [];
|
||||
$trips = [];
|
||||
$trip_data = ['distance'=>0,'duration'=>0,'total'=>0,'speed'=>0];
|
||||
$trips_data = [];
|
||||
$processed = 0;
|
||||
$last_device_id = -1;
|
||||
$r = pg_query($gpsconn, $q);
|
||||
while ($f=pg_fetch_assoc($r)) {
|
||||
//$res[] = $f;
|
||||
//if ($f["seconds"]>=$stop_limit || ($last_device_id!=$f["device_id"] && count($trip)>0)) {
|
||||
if ($f["inner_duration"]>=$stop_limit || ($last_device_id!=$f["device_id"] && count($trip)>0)) {
|
||||
$trips[] = $trip;
|
||||
$trips_data[] = $trip_data;
|
||||
$n = count($trip)-1;
|
||||
list ($res, $err) = processMemberGPSTrackingTripsSave(
|
||||
$member_id,
|
||||
$trip[0],
|
||||
array_pop($trip),
|
||||
$trip_data);
|
||||
if ($res && $res["id"]>0) {
|
||||
echo "[".date("Y-m-d H:i:s")."] New trip processed: ".$res["id"]." (member_id=${member_id})\n";
|
||||
} else {
|
||||
echo "[".date("Y-m-d H:i:s")."] Failed to process trip: ${err} (member_id=${member_id})\n";
|
||||
var_dump($trip_data); // DEBUG
|
||||
}
|
||||
unset($trip);
|
||||
unset($trip_data);
|
||||
$trip = [];
|
||||
$trip_data = ['distance'=>0,'duration'=>0,'total'=>0,'speed'=>0];
|
||||
} else {
|
||||
$trip[] = $f;
|
||||
$trip_data['distance'] += $f["distance"];
|
||||
$trip_data['duration'] += $f["ms"];
|
||||
$trip_data['speed'] += $f["ms"]>0 ? (3600 * $f["distance"] / $f["ms"]) : 0; // km/h
|
||||
$trip_data['total'] += 1;
|
||||
}
|
||||
$last_device_id = $f["device_id"];
|
||||
$processed++;
|
||||
}
|
||||
pg_free_result($r);
|
||||
return $processed;
|
||||
}
|
||||
|
||||
function processMemberGPSTrackingTripsSave($member_id,$first,$last,$trip_data) {
|
||||
global $gpsconn;
|
||||
|
||||
$distance = $trip_data['distance'];
|
||||
$duration = $trip_data['duration']/1000;
|
||||
|
||||
if ($distance>0 && $duration>0) {
|
||||
// OK
|
||||
} else {
|
||||
return [NULL, "Distance and/or duration are invalid!"];
|
||||
}
|
||||
|
||||
$speed = 3600 * $trip_data['distance'] / $trip_data['duration'];
|
||||
$avg_speed = $trip_data['speed'] / $trip_data['total'];
|
||||
$device_id = ($first['device_id']>0 ? $first['device_id'] : "NULL");
|
||||
|
||||
$location_start = $first['last_id'];
|
||||
$location_end = $last['last_id'];
|
||||
$gps_start = $first['gps_end'];
|
||||
$gps_end = $last['gps_end'];
|
||||
|
||||
$address_start = processMemberGPSTrackingTripsReverseGeocode($gps_start);
|
||||
$address_end = processMemberGPSTrackingTripsReverseGeocode($gps_end);
|
||||
|
||||
$q = "SELECT * FROM members_tracking_trips WHERE member_id=${member_id} ";
|
||||
$q.= "AND location_start=${location_start} AND location_end=${location_end} ";
|
||||
$q.= "AND ".($device_id=="NULL"?"device_id IS NULL":"device_id=${device_id}");
|
||||
$r = pg_query($gpsconn, $q);
|
||||
if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) {
|
||||
$q = "UPDATE members_tracking_trips SET distance=${distance},duration=${duration},";
|
||||
$q.= "speed=${speed},avg_speed=${avg_speed},gps_start='${gps_start}',gps_end='${gps_end}',";
|
||||
$q.= "address_start='${address_start}',address_end='${address_end}' WHERE member_id=${member_id} ";
|
||||
$q.= "AND location_start=${location_start} AND location_end=${location_end} ";
|
||||
$q.= "AND ".($device_id=="NULL"?"device_id IS NULL":"device_id=${device_id}");
|
||||
$q.=" RETURNING *";
|
||||
} else {
|
||||
$q = "INSERT INTO members_tracking_trips (member_id,distance,duration,speed,avg_speed,location_start,location_end,device_id,gps_start,gps_end,address_start,address_end) VALUES(";
|
||||
$q.= "${member_id},${distance},${duration},${speed},${avg_speed},${location_start},${location_end},${device_id},'${gps_start}','${gps_end}',${address_start},${address_end}";
|
||||
$q.= ") RETURNING *";
|
||||
}
|
||||
$r = pg_query($gpsconn, $q);
|
||||
if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) {
|
||||
pg_free_result($r);
|
||||
return [$f, NULL];
|
||||
}
|
||||
echo "\n$q\n";
|
||||
return [NULL, pg_last_error($gpsconn)];
|
||||
}
|
||||
|
||||
function processMemberGPSTrackingTripsReverseGeocode($gps) {
|
||||
global $gpsconn, $readOnlyReplicaConn;
|
||||
if ($gps==NULL || $gps=="") {
|
||||
echo "[".date("Y-m-d H:i:s")."] Empty GPS string\n";
|
||||
return "NULL";
|
||||
}
|
||||
$q = "SELECT ST_Y('${gps}') AS lat, ST_X('${gps}') AS lng";
|
||||
$r = pg_query($gpsconn, $q);
|
||||
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
||||
$lat = $f[0];
|
||||
$lng = $f[1];
|
||||
} else {
|
||||
echo "[".date("Y-m-d H:i:s")."] Failed to decode GPS: '${gps}'\n";
|
||||
return "NULL";
|
||||
}
|
||||
list ($address,$err) = reverseGeocode($readOnlyReplicaConn, $lat, $lng);
|
||||
if (!$address && $err) {
|
||||
echo "[".date("Y-m-d H:i:s")."] Failed to reverse geocode GPS: ${lat},${lng}\n";
|
||||
return "NULL";
|
||||
}
|
||||
$q = "SELECT id FROM address WHERE lower(address)=lower('".pg_escape_string($address)."') ORDER BY geocoding_date DESC LIMIT 1";
|
||||
//echo "\n\n${q}\n\n";
|
||||
$r = pg_query($readOnlyReplicaConn, $q);
|
||||
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
||||
return $f[0];
|
||||
}
|
||||
echo "[".date("Y-m-d H:i:s")."] Failed to load address: '${address}'\n";
|
||||
return "NULL";
|
||||
}
|
||||
|
||||
function reverseGeocode($db, $lat, $lng) {
|
||||
// Reverse geocode using local DB
|
||||
list($res,$err) = GeocodeReverseGeocode($db, $lat, $lng);
|
||||
if (is_array($res) && isset($res["address"])) {
|
||||
return [$res["address"],NULL];
|
||||
}
|
||||
// Reverse geocode service using Google Maps API
|
||||
list($res,$err) = reverseGeocodeService($db, $lat, $lng);
|
||||
if (is_array($res) && isset($res["address"]) && $res["address"]!="") {
|
||||
reverseGeocodeServiceSave($db, $res);
|
||||
return [$res["address"],NULL];
|
||||
}
|
||||
return [NULL, $err];
|
||||
}
|
||||
|
||||
function GeocodeReverseGeocode($db, $lat, $lng) {
|
||||
$db_lat = ((float)$lat);
|
||||
$db_lng = ((float)$lng);
|
||||
// Check our local address table with precisions 5 to 3
|
||||
for ($i=5;$i>2;$i--) {
|
||||
$q = "SELECT * FROM address WHERE round(latitude,${i})=round(${db_lat},${i}) AND round(longitude,${i})=round(${db_lng},${i})";
|
||||
$r = pg_query($db, $q);
|
||||
if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) {
|
||||
return [$f, NULL];
|
||||
}
|
||||
}
|
||||
// Check our local Singapore table with precisions 5 to 3
|
||||
/* for ($i=5;$i>2;$i--) {
|
||||
$q = "SELECT * FROM singapore_buildings WHERE round(latitude,${i})=round(${db_lat},${i}) AND round(longitude,${i})=round(${db_lng},${i})";
|
||||
$r = pg_query($q);
|
||||
if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) {
|
||||
return [$f, NULL];
|
||||
}
|
||||
} //*/
|
||||
return [NULL, pg_last_error($db)];
|
||||
}
|
||||
|
||||
function reverseGeocodeServiceSave($db, $result) {
|
||||
$db_lat = floatval($result["lat"]);
|
||||
$db_lng = floatval($result["lng"]);
|
||||
$db_address = pg_escape_string($result["address"]);
|
||||
$db_postal = pg_escape_string($result["postal"]);
|
||||
$db_tz = GeocodeGetTimezone($db, $result["timeZoneId"]);
|
||||
// Check the address exists
|
||||
$q = "SELECT id FROM address WHERE lower(address)=lower('${db_address}') ";
|
||||
$q.= "AND latitude<>0 AND longitude<>0 AND geocoding_date IS NOT NULL ";
|
||||
$q.= "ORDER BY geocoding_date DESC LIMIT 1";
|
||||
//error_log($q);
|
||||
$r = pg_query($db, $q);
|
||||
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
||||
return $f[0];
|
||||
}
|
||||
$q = "INSERT INTO address (address,latitude,longitude,timezone,geocoding_date,postal) VALUES (";
|
||||
$q.= "'${db_address}',${db_lat},${db_lng},".($db_tz==NULL?"NULL":$db_tz).",now(),'${db_postal}')";
|
||||
$q.= " RETURNING id";
|
||||
$r = pg_query($db, $q);
|
||||
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
||||
return $f[0];
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
function GeocodeGetTimezone($db, $timezone) {
|
||||
$db_timezone = pg_escape_string($timezone);
|
||||
$q = "SELECT id FROM address_timezone WHERE timezone='${db_timezone}'";
|
||||
$r = pg_query($db, $q);
|
||||
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
||||
return $f[0];
|
||||
}
|
||||
$q = "INSERT INTO address_timezone (timezone) VALUES('${db_timezone}') RETURNING id";
|
||||
$r = pg_query($db, $q);
|
||||
if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) {
|
||||
return $f[0];
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
function reverseGeocodeService($db, $lat, $lng) {
|
||||
global $httpAuthToken;
|
||||
$data = http_build_query(
|
||||
array(
|
||||
'lat' => $lat,
|
||||
'lng' => $lng
|
||||
)
|
||||
);
|
||||
$url = "http://oauth2.service/api/v1/reverse?" . $data;
|
||||
//echo "\n\n${url}\n\n";
|
||||
$opts = array(
|
||||
'http' => array(
|
||||
'method' => "GET",
|
||||
'timeout' => 60, /* 1 minute */
|
||||
'header' =>
|
||||
"Content-Type: application/x-www-form-urlencoded\r\n" .
|
||||
"Accept: application/json\r\n" .
|
||||
"Authorization: Server-Token ${httpAuthToken}\r\n",
|
||||
)
|
||||
);
|
||||
$context = stream_context_create($opts);
|
||||
$body = file_get_contents($url, false, $context);
|
||||
//error_log("BODY: ".$body);
|
||||
$geocoded = json_decode($body,true);
|
||||
if (is_array($geocoded) && is_array($geocoded["data"]) && !isset($geocoded["error"])) {
|
||||
//error_log(json_encode($geocoded));
|
||||
// Cache the result in DB
|
||||
/*Geocode::saveDistanceCache(
|
||||
$db->getConnect(), $fromLat, $fromLng, $toLat, $toLng, $geocoded["data"]);
|
||||
*/
|
||||
return [$geocoded["data"], NULL];
|
||||
} else if (is_array($geocoded) && isset($geocoded["error"])) {
|
||||
$body = $geocoded["error"];
|
||||
}
|
||||
return [NULL, "Reverse geocoding service call error: ".$body];
|
||||
}
|
||||
|
||||
pg_close($readOnlyReplicaConn);
|
||||
pg_close($gpsconn);
|
||||
echo "[".date("Y-m-d H:i:s")."] GPS members_tracking_trips job complete.\n";
|
||||
Reference in New Issue
Block a user