cfgReadChar('system.oauth2_token'); $db_host = $savvyext->cfgReadChar('database_replica.host'); $db_name = $savvyext->cfgReadChar('database_replica.name'); $db_user = $savvyext->cfgReadChar('database_replica.user'); $db_pass = $savvyext->cfgReadChar('database_replica.pass'); $db_port = $savvyext->cfgReadLong('database_replica.port'); $readOnlyReplicaConnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}"; $readOnlyReplicaConn = pg_connect($readOnlyReplicaConnstr); $db_host = $savvyext->cfgReadChar('gpsdatabase.host'); $db_name = $savvyext->cfgReadChar('gpsdatabase.name'); $db_user = $savvyext->cfgReadChar('gpsdatabase.user'); $db_pass = $savvyext->cfgReadChar('gpsdatabase.pass'); $db_port = $savvyext->cfgReadLong('gpsdatabase.port'); $gpsconnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}"; $gpsconn = pg_connect($gpsconnstr); $q = "select member_id,count(*) from members_tracking "; //$q.= "where member_id=55 "; // DEBUG $q.= "group by member_id"; $r = pg_query($gpsconn, $q); if ($r && pg_num_rows($r)) { while ($f=pg_fetch_row($r)) { $member_id = $f[0]; echo "[".date("Y-m-d H:i:s")."] Processing member_id=".$member_id." (total member records: ".$f[1].")\n"; $p = processMemberGPSTrackingTrips($member_id); echo "[".date("Y-m-d H:i:s")."] Processed member_id=".$member_id." (processed records: ".$p.")\n"; } } function processMemberGPSTrackingTrips($member_id) { global $gpsconn,$precision,$hard_limit,$stop_limit; echo "[".date("Y-m-d H:i:s")."] processMemberGPSDataPartition($member_id)\n"; $id = 0; $q = 'select max(location_start),max(location_end) from members_tracking_trips where member_id=' . $member_id; $r = pg_query($gpsconn, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { $id = max((int)$f[0],(int)$f[1]); } echo "[".date("Y-m-d H:i:s")."] Last processed ID is ${id}\n"; $q = "select a.first_id,a.last_id,a.device_id,extract(milliseconds from a.duration) as ms, "; $q.= "extract(epoch from a.duration) as seconds, c.speed, a.inner_duration, a.total, "; $q.= "ST_Distance(c.gps,b.gps) AS distance, b.gps AS gps_start, c.gps AS gps_end,"; $q.= "EXTRACT(epoch FROM c.ttime-b.ttime) AS travel_time "; $q.= "from members_tracking_duration a "; $q.= "left join members_tracking b on (b.id=a.first_id) "; $q.= "left join members_tracking c on (c.id=a.last_id) "; $q.= "where a.member_id=${member_id} "; $q.= "and a.first_id>${id} AND a.last_id>${id} "; // Limit ? $q.= "order by a.device_id, a.first_time limit ${hard_limit}"; //echo "\n$q\n"; $r = pg_query($gpsconn, $q); if ($r && pg_num_rows($r)) { echo "[".date("Y-m-d H:i:s")."] Records to process: ".pg_num_rows($r)."\n"; } else { echo "[".date("Y-m-d H:i:s")."] No new records to process. Stop.\n"; return 0; } echo "[".date("Y-m-d H:i:s")."] Processing"; $trip = []; $trips = []; $trip_data = ['distance'=>0,'duration'=>0,'total'=>0,'speed'=>0]; $trips_data = []; $processed = 0; $last_device_id = -1; $r = pg_query($gpsconn, $q); while ($f=pg_fetch_assoc($r)) { //$res[] = $f; //if ($f["seconds"]>=$stop_limit || ($last_device_id!=$f["device_id"] && count($trip)>0)) { if ($f["inner_duration"]>=$stop_limit || ($last_device_id!=$f["device_id"] && count($trip)>0)) { $trips[] = $trip; $trips_data[] = $trip_data; $n = count($trip)-1; list ($res, $err) = processMemberGPSTrackingTripsSave( $member_id, $trip[0], array_pop($trip), $trip_data); if ($res && $res["id"]>0) { echo "[".date("Y-m-d H:i:s")."] New trip processed: ".$res["id"]." (member_id=${member_id})\n"; } else { echo "[".date("Y-m-d H:i:s")."] Failed to process trip: ${err} (member_id=${member_id})\n"; var_dump($trip_data); // DEBUG } unset($trip); unset($trip_data); $trip = []; $trip_data = ['distance'=>0,'duration'=>0,'total'=>0,'speed'=>0]; } else { $trip[] = $f; $trip_data['distance'] += $f["distance"]; $trip_data['duration'] += $f["ms"]; $trip_data['speed'] += $f["ms"]>0 ? (3600 * $f["distance"] / $f["ms"]) : 0; // km/h $trip_data['total'] += 1; } $last_device_id = $f["device_id"]; $processed++; } pg_free_result($r); return $processed; } function processMemberGPSTrackingTripsSave($member_id,$first,$last,$trip_data) { global $gpsconn; $distance = $trip_data['distance']; $duration = $trip_data['duration']/1000; if ($distance>0 && $duration>0) { // OK } else { return [NULL, "Distance and/or duration are invalid!"]; } $speed = 3600 * $trip_data['distance'] / $trip_data['duration']; $avg_speed = $trip_data['speed'] / $trip_data['total']; $device_id = ($first['device_id']>0 ? $first['device_id'] : "NULL"); $location_start = $first['last_id']; $location_end = $last['last_id']; $gps_start = $first['gps_end']; $gps_end = $last['gps_end']; $address_start = processMemberGPSTrackingTripsReverseGeocode($gps_start); $address_end = processMemberGPSTrackingTripsReverseGeocode($gps_end); $q = "SELECT * FROM members_tracking_trips WHERE member_id=${member_id} "; $q.= "AND location_start=${location_start} AND location_end=${location_end} "; $q.= "AND ".($device_id=="NULL"?"device_id IS NULL":"device_id=${device_id}"); $r = pg_query($gpsconn, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) { $q = "UPDATE members_tracking_trips SET distance=${distance},duration=${duration},"; $q.= "speed=${speed},avg_speed=${avg_speed},gps_start='${gps_start}',gps_end='${gps_end}',"; $q.= "address_start='${address_start}',address_end='${address_end}' WHERE member_id=${member_id} "; $q.= "AND location_start=${location_start} AND location_end=${location_end} "; $q.= "AND ".($device_id=="NULL"?"device_id IS NULL":"device_id=${device_id}"); $q.=" RETURNING *"; } else { $q = "INSERT INTO members_tracking_trips (member_id,distance,duration,speed,avg_speed,location_start,location_end,device_id,gps_start,gps_end,address_start,address_end) VALUES("; $q.= "${member_id},${distance},${duration},${speed},${avg_speed},${location_start},${location_end},${device_id},'${gps_start}','${gps_end}',${address_start},${address_end}"; $q.= ") RETURNING *"; } $r = pg_query($gpsconn, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) { pg_free_result($r); return [$f, NULL]; } echo "\n$q\n"; return [NULL, pg_last_error($gpsconn)]; } function processMemberGPSTrackingTripsReverseGeocode($gps) { global $gpsconn, $readOnlyReplicaConn; if ($gps==NULL || $gps=="") { echo "[".date("Y-m-d H:i:s")."] Empty GPS string\n"; return "NULL"; } $q = "SELECT ST_Y('${gps}') AS lat, ST_X('${gps}') AS lng"; $r = pg_query($gpsconn, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { $lat = $f[0]; $lng = $f[1]; } else { echo "[".date("Y-m-d H:i:s")."] Failed to decode GPS: '${gps}'\n"; return "NULL"; } list ($address,$err) = reverseGeocode($readOnlyReplicaConn, $lat, $lng); if (!$address && $err) { echo "[".date("Y-m-d H:i:s")."] Failed to reverse geocode GPS: ${lat},${lng}\n"; return "NULL"; } $q = "SELECT id FROM address WHERE lower(address)=lower('".pg_escape_string($address)."') ORDER BY geocoding_date DESC LIMIT 1"; //echo "\n\n${q}\n\n"; $r = pg_query($readOnlyReplicaConn, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { return $f[0]; } echo "[".date("Y-m-d H:i:s")."] Failed to load address: '${address}'\n"; return "NULL"; } function reverseGeocode($db, $lat, $lng) { // Reverse geocode using local DB list($res,$err) = GeocodeReverseGeocode($db, $lat, $lng); if (is_array($res) && isset($res["address"])) { return [$res["address"],NULL]; } // Reverse geocode service using Google Maps API list($res,$err) = reverseGeocodeService($db, $lat, $lng); if (is_array($res) && isset($res["address"]) && $res["address"]!="") { reverseGeocodeServiceSave($db, $res); return [$res["address"],NULL]; } return [NULL, $err]; } function GeocodeReverseGeocode($db, $lat, $lng) { $db_lat = ((float)$lat); $db_lng = ((float)$lng); // Check our local address table with precisions 5 to 3 for ($i=5;$i>2;$i--) { $q = "SELECT * FROM address WHERE round(latitude,${i})=round(${db_lat},${i}) AND round(longitude,${i})=round(${db_lng},${i})"; $r = pg_query($db, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) { return [$f, NULL]; } } // Check our local Singapore table with precisions 5 to 3 /* for ($i=5;$i>2;$i--) { $q = "SELECT * FROM singapore_buildings WHERE round(latitude,${i})=round(${db_lat},${i}) AND round(longitude,${i})=round(${db_lng},${i})"; $r = pg_query($q); if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) { return [$f, NULL]; } } //*/ return [NULL, pg_last_error($db)]; } function reverseGeocodeServiceSave($db, $result) { $db_lat = floatval($result["lat"]); $db_lng = floatval($result["lng"]); $db_address = pg_escape_string($result["address"]); $db_postal = pg_escape_string($result["postal"]); $db_tz = GeocodeGetTimezone($db, $result["timeZoneId"]); // Check the address exists $q = "SELECT id FROM address WHERE lower(address)=lower('${db_address}') "; $q.= "AND latitude<>0 AND longitude<>0 AND geocoding_date IS NOT NULL "; $q.= "ORDER BY geocoding_date DESC LIMIT 1"; //error_log($q); $r = pg_query($db, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { return $f[0]; } $q = "INSERT INTO address (address,latitude,longitude,timezone,geocoding_date,postal) VALUES ("; $q.= "'${db_address}',${db_lat},${db_lng},".($db_tz==NULL?"NULL":$db_tz).",now(),'${db_postal}')"; $q.= " RETURNING id"; $r = pg_query($db, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { return $f[0]; } return NULL; } function GeocodeGetTimezone($db, $timezone) { $db_timezone = pg_escape_string($timezone); $q = "SELECT id FROM address_timezone WHERE timezone='${db_timezone}'"; $r = pg_query($db, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { return $f[0]; } $q = "INSERT INTO address_timezone (timezone) VALUES('${db_timezone}') RETURNING id"; $r = pg_query($db, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { return $f[0]; } return NULL; } function reverseGeocodeService($db, $lat, $lng) { global $httpAuthToken; $data = http_build_query( array( 'lat' => $lat, 'lng' => $lng ) ); $url = "http://oauth2.service/api/v1/reverse?" . $data; //echo "\n\n${url}\n\n"; $opts = array( 'http' => array( 'method' => "GET", 'timeout' => 60, /* 1 minute */ 'header' => "Content-Type: application/x-www-form-urlencoded\r\n" . "Accept: application/json\r\n" . "Authorization: Server-Token ${httpAuthToken}\r\n", ) ); $context = stream_context_create($opts); $body = file_get_contents($url, false, $context); //error_log("BODY: ".$body); $geocoded = json_decode($body,true); if (is_array($geocoded) && is_array($geocoded["data"]) && !isset($geocoded["error"])) { //error_log(json_encode($geocoded)); // Cache the result in DB /*Geocode::saveDistanceCache( $db->getConnect(), $fromLat, $fromLng, $toLat, $toLng, $geocoded["data"]); */ return [$geocoded["data"], NULL]; } else if (is_array($geocoded) && isset($geocoded["error"])) { $body = $geocoded["error"]; } return [NULL, "Reverse geocoding service call error: ".$body]; } pg_close($readOnlyReplicaConn); pg_close($gpsconn); echo "[".date("Y-m-d H:i:s")."] GPS members_tracking_trips job complete.\n";