cfgReadChar('gpsdatabase.host'); $db_name = $savvyext->cfgReadChar('gpsdatabase.name'); $db_user = $savvyext->cfgReadChar('gpsdatabase.user'); $db_pass = $savvyext->cfgReadChar('gpsdatabase.pass'); $db_port = $savvyext->cfgReadLong('gpsdatabase.port'); $gpsconnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}"; $gpsconn = pg_connect($gpsconnstr); /***** load configuration *****/ $httpAuthToken = $savvyext->cfgReadChar('system.oauth2_token'); $encryptionAlg = $savvyext->cfgReadChar('encryption.algorithm'); $encryptionKey = $savvyext->cfgReadChar('encryption.key'); $encryptionIV = $savvyext->cfgReadChar('encryption.iv'); $baseURL = $savvyext->cfgReadChar('system.api_url'); $googleApiKey = $savvyext->cfgReadChar('google.api_key'); /***** definition *****/ define('STOPPED', 'stopped'); define('WALKING', 'walking'); define('DRIVING_AND_PUBLIC_TRANSPORT', 'driving_and_public_tranport'); define('UNKNOWN', 'unknown'); $segment_duration_limit = 1; // min $trip_limit = 20; $needed_trip_limit = 18; // km/h $walking_low_avg_speed = 3; $walking_high_avg_speed = 10; $low_avg_speed = 10; $high_avg_speed = 160; /***** start worker to process a job *****/ $worker = new GearmanWorker(); $worker->addServer('127.0.0.1', 4730); $worker->addFunction('gps_trips_worker', function (GearmanJob $job) { $member_id = $job->workload(); processMemberGPSTrips($member_id); echo "Waiting for job...\n"; }); while ($worker->work()) { if ($worker->returnCode() != GEARMAN_SUCCESS) { echo "return_code: " . $worker->returnCode() . "\n"; break; } } pg_close($gpsconn); echo "[" . date("Y-m-d H:i:s") . "] GPS trips job complete.\n"; /********************* Functions *********************/ function processMemberGPSTrips(int $member_id) { global $gpsconn, $high_avg_speed, $segment_duration_limit, $trip_limit; $lastProcessedDatetime = getLastProcessedDatetime($member_id); if ($lastProcessedDatetime === NULL) { echo 'There something wrong with member_id = '.$member_id; return; } $whereQuery = ''; if ($lastProcessedDatetime !== '') { $whereQuery .= " AND ttime > '" . $lastProcessedDatetime . "' "; } // and take the data not newer than 24 hrs $track_query = " SELECT id as members_tracking_id, member_id, lat, lng, ttime as time FROM members_tracking WHERE member_id = $member_id AND ttime <= (now() - INTERVAL '24 HOURS') " . $whereQuery . " ORDER BY ttime ASC "; $track_res = pg_query($gpsconn, $track_query); if (!$track_res) { echo pg_last_error($gpsconn); } if ($track_res && pg_num_rows($track_res) > 0) { $first_data = NULL; $last_data = NULL; $dataset = []; $rowCount = 0; $i = 0; $tripStartIndex = 'NONE'; echo "[" . date("Y-m-d H:i:s") . "] Processing member_id=" . $member_id . " . Waiting for a moment.\n"; while ($row = pg_fetch_array($track_res)) { if ($first_data === NULL) { $first_data = $row; } $last_data = $row; $avg_speed_data = calculateAvgSpeed($first_data, $last_data); $trip_segment = [ 'members_tracking_id' => $last_data['members_tracking_id'], 'member_id' => $last_data['member_id'], 'lat' => $last_data['lat'], 'lng' => $last_data['lng'], 'geo' => calculateGeo($last_data['lat'], $last_data['lng']), 'time' => $last_data['time'], 'duration' => $avg_speed_data['duration'], 'distance' => $avg_speed_data['distance'], 'avg_speed' => $avg_speed_data['avg_speed'], 'trip_detection_1' => 'NO-TRIP', // initialize it as NO_TRIP, then we will update it later 'trip_detection_2' => 'NO-TRIP', // initialize it as NO_TRIP, then we will update it later ]; // calculate trip_detection_1 base on duration or avg speed // if duration of a segement > $segment_duration_limit minute // OR avg speed of a segment > $high_avg_speed km/h => segment may not belongs to a trip if ($avg_speed_data['duration'] > $segment_duration_limit || $avg_speed_data['duration'] <= 0 || $avg_speed_data['avg_speed'] > $high_avg_speed ) { $trip_segment['trip_detection_1'] = 'NO-TRIP'; } else { $trip_segment['trip_detection_1'] = 'TRIP'; } $dataset[] = $trip_segment; $rowCount++; $first_data = $last_data; // calculate trip_detection_2 base on trip_detection_1 if ($rowCount === $trip_limit) { detectTrips($dataset, $tripStartIndex, $i); $i++; $rowCount--; } } } pg_free_result($track_res); } function getLastProcessedDatetime(int $member_id) { global $gpsconn; $q = " SELECT travel_date_end FROM gps_trips WHERE member_id = $member_id ORDER BY travel_date_end DESC LIMIT 1 "; $lastProcessedDatetime = NULL; $r = pg_query($gpsconn, $q); if (!$r) { echo pg_last_error($gpsconn); } if ($r && pg_num_rows($r) > 0) { $data = pg_fetch_object($r, 0); $lastProcessedDatetime = $data->travel_date_end; } else { $lastProcessedDatetime = ''; } return $lastProcessedDatetime; } function detectTrips(array &$dataset, &$tripStartIndex, int &$i) { global $needed_trip_limit, $trip_limit; // calculate trip_detection_2 base on trip_detection_1 $j = $i; $segmentCount = 0; while ($j < $i + $trip_limit) { if ($dataset[$j]['trip_detection_1'] === 'TRIP') { $segmentCount++; } $j++; } if ($segmentCount >= $needed_trip_limit) { $dataset[$i]['trip_detection_2'] = 'TRIP'; } // detect a trip based on trip_detection_2 if ($dataset[$i]['trip_detection_2'] === 'TRIP' && $tripStartIndex === 'NONE') { $tripStartIndex = $i; } if ($tripStartIndex !== 'NONE' && $dataset[$i]['trip_detection_2'] !== 'TRIP') { $trip_data = array_slice($dataset, $tripStartIndex, ($i - 1) - $tripStartIndex + 1); processEachTrip($trip_data); // remove rows that were calculated for a detected trip, reduce memory array_splice($dataset, 0, ($i - 1) - 0 + 1); $i = 0; $tripStartIndex = 'NONE'; } } function processEachTrip(array $trip_data) { global $walking_low_avg_speed, $walking_high_avg_speed, $low_avg_speed, $high_avg_speed; // detect legs of a trip $cumulative_duration_total = 0; $cumulative_distance_total = 0; foreach ($trip_data as $key => $item) { // note: in case first item, cumulative_duration and cumulative_distance will be 0 if ($key != 0) { $cumulative_duration_total += $item['duration']; $cumulative_distance_total += $item['distance']; } // km/h $cumulative_avg_speed = $cumulative_duration_total > 0 ? round($cumulative_distance_total / $cumulative_duration_total * 60, 2) : 0; $trip_data[$key]['cumulative_duration'] = $cumulative_duration_total; $trip_data[$key]['cumulative_distance'] = $cumulative_distance_total; $trip_data[$key]['cumulative_avg_speed'] = $cumulative_avg_speed; $trip_data[$key]['transport_mode'] = ''; // how to detect transport mode if ($cumulative_avg_speed < $walking_low_avg_speed) { $trip_data[$key]['transport_mode'] = STOPPED; } else if ($cumulative_avg_speed >= $walking_low_avg_speed && $cumulative_avg_speed <= $walking_high_avg_speed) { $trip_data[$key]['transport_mode'] = WALKING; } else if ($cumulative_avg_speed >= $low_avg_speed && $cumulative_avg_speed <= $high_avg_speed) { $trip_data[$key]['transport_mode'] = DRIVING_AND_PUBLIC_TRANSPORT; } else if ($cumulative_avg_speed > $high_avg_speed) { $trip_data[$key]['transport_mode'] = UNKNOWN; } } $gps_trip_id = insertATripIntoDatabase($trip_data); splitLegsFromATrip($gps_trip_id, $trip_data); } function insertATripIntoDatabase(array $trip_data): int { global $gpsconn; if (count($trip_data) <= 0) { return 0; } $query = " INSERT INTO gps_trips(member_id, travel_date, travel_date_end, duration, distance, location_start_lat, location_start_lng, location_start, location_start_id, location_end_lat, location_end_lng, location_end, location_end_id, avg_speed, start_members_tracking_id, last_members_tracking_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) RETURNING id; "; $uniq = 'insert_gps_trips_' . uniqid(rand(), true); pg_prepare($gpsconn, $uniq, $query); $first_item = $trip_data[0]; $last_item = end($trip_data); $avg_speed_data = calculateAvgSpeed($first_item, $last_item); $duration_total = round($avg_speed_data['duration'], 2); $distance_total = round($avg_speed_data['distance'], 2); $avg_speed = round($avg_speed_data['avg_speed'], 2); $res = pg_execute($gpsconn, $uniq, [ $first_item['member_id'], $first_item['time'], $last_item['time'], $duration_total, $distance_total, $first_item['lat'], $first_item['lng'], $first_item['geo'], getAddressId($first_item['lat'], $first_item['lng']), $last_item['lat'], $last_item['lng'], $last_item['geo'], getAddressId($last_item['lat'], $last_item['lng']), $avg_speed, $first_item['members_tracking_id'], $last_item['members_tracking_id'] ]); if ($res) { $id = (int) pg_fetch_result($res, 0); return $id; } else { echo pg_last_error($gpsconn); } return 0; } function splitLegsFromATrip(int $gps_trip_id, array $trip_data) { $trip_data_count = count($trip_data); if ($trip_data_count <= 0) { return; } $transport_mode = ''; $change_point_index = 0; $array_key_last = array_keys($trip_data)[$trip_data_count - 1]; foreach ($trip_data as $key => $item) { $leg_last_index = $key; if ((isset($trip_data[$leg_last_index + 1]['transport_mode']) && $trip_data[$leg_last_index]['transport_mode'] != $trip_data[$leg_last_index + 1]['transport_mode']) || ($array_key_last == $leg_last_index) ) { // precess save for each leg $leg_start_index = $change_point_index == 0 ? 0 : $change_point_index - 1; $leg_data = array_slice($trip_data, $leg_start_index, $leg_last_index - $leg_start_index + 1); insertALegIntoDatabase($gps_trip_id, $leg_data, $trip_data[$leg_last_index]['transport_mode']); $change_point_index = $leg_last_index + 1; } } } function insertALegIntoDatabase(int $gps_trip_id, array $trip_data, string $transport_mode) { global $gpsconn; if (count($trip_data) <= 0) { return; } $query = " INSERT INTO gps_trips_legs(member_id, gps_trip_id, travel_date, travel_date_end, duration, distance, location_start_lat, location_start_lng, location_start, location_start_id, location_end_lat, location_end_lng, location_end, location_end_id, avg_speed, transport_mode, start_members_tracking_id, last_members_tracking_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18); "; $uniq = 'insert_gps_trips_legs_' . uniqid(rand(), true); pg_prepare($gpsconn, $uniq, $query); $first_item = $trip_data[0]; $last_item = end($trip_data); $avg_speed_data = calculateAvgSpeed($first_item, $last_item); $duration_total = round($avg_speed_data['duration'], 2); $distance_total = round($avg_speed_data['distance'], 2); $avg_speed = round($avg_speed_data['avg_speed'], 2); $res = pg_execute($gpsconn, $uniq, [ $first_item['member_id'], $gps_trip_id, $first_item['time'], $last_item['time'], $duration_total, $distance_total, $first_item['lat'], $first_item['lng'], $first_item['geo'], getAddressId($first_item['lat'], $first_item['lng']), $last_item['lat'], $last_item['lng'], $last_item['geo'], getAddressId($last_item['lat'], $last_item['lng']), $avg_speed, $transport_mode, $first_item['members_tracking_id'], $last_item['members_tracking_id'] ]); if (!$res) { echo pg_last_error($gpsconn); } } function calculateGeo($lat, $lng) { global $gpsconn; $q = " SELECT ST_SetSRID(ST_MakePoint('" . $lng . "','" . $lat . "'), 4326)::geography as geo "; $r = pg_query($gpsconn, $q); if (!$r) { echo pg_last_error($gpsconn); } if ($r && pg_num_rows($r) > 0) { $data = pg_fetch_object($r, 0); return $data->geo; } return null; } function calculateAvgSpeed(array $first_data, array $last_data): array { $start_date = strtotime($first_data['time']); $end_date = strtotime($last_data['time']); $duration = abs($start_date - $end_date); $duration = $duration / 60; // convert sec to min $distance = sqrt(pow(abs(($last_data['lng'] - $first_data['lng']) * 111), 2) + pow(abs(($last_data['lat'] - $first_data['lat']) * 111), 2)); $avg_speed = ($duration > 0) ? ($distance / $duration * 60) : 0; // convert km/m to km/h return [ 'duration' => $duration, 'distance' => $distance, 'avg_speed' => $avg_speed, ]; } function getAddressId(float $lat, float $lng) : int { $reverse_data = reverseGeocode($lat, $lng); if (empty($reverse_data)) { return 0; } list($fAddress, $err) = geocode($reverse_data['address_name'], $reverse_data['country_short_name']); if (!is_array($fAddress) || !array_key_exists('geocode',$fAddress) || !is_array($fAddress['geocode']) || !array_key_exists('id',$fAddress['geocode']) || $fAddress['geocode']['id']<1) { // Failed to geocode from address return 0; } return $fAddress['geocode']['id']; } function geocode(string $address_name, string $country_code, int $member_id=0) : array{ global $encryptionAlg, $encryptionKey, $encryptionIV, $baseURL, $httpAuthToken; $in = [ 'address' => $address_name, 'member_id' => $member_id, 'country' => $country_code ]; $data = http_build_query($in); $url = $baseURL . "/trips/api/geocode/?".$data; $ch = curl_init($url); curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "GET"); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($ch, CURLOPT_VERBOSE, false); curl_setopt($ch, CURLOPT_HTTPHEADER, array( 'Content-Type: application/json', 'Authorization: Server-Token ' . $httpAuthToken, "client_id: BATCH" ) ); $body = curl_exec($ch); $result = json_decode($body,true); $payload = openssl_decrypt( hex2bin( $result['payload'] ), $encryptionAlg, $encryptionKey, OPENSSL_RAW_DATA, $encryptionIV ); return [json_decode($payload,true),$body]; } function reverseGeocode(float $lat, float $lng) : array { // LINK: https://developers.google.com/maps/documentation/geocoding/start#reverse global $googleApiKey; $url = "https://maps.googleapis.com/maps/api/geocode/json?latlng=${lat},${lng}&key=${googleApiKey}"; $ch = curl_init($url); curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'GET'); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($ch, CURLOPT_VERBOSE, false); $body = curl_exec($ch); $res = json_decode($body); $country_short_name = ''; $address_name = ''; if (isset($res->status) && $res->status === 'OK') { $address_components = $res->results[0]->address_components; $address_name = $res->results[0]->formatted_address; for ($i = 0;$i < count($address_components);$i++) { $cn = array($address_components[$i]->types[0]); if (in_array("country", $cn)) { $country_short_name = $address_components[$i]->short_name; } } } if (!empty($country_short_name) && !empty($address_name)) { return [ 'country_short_name' => $country_short_name, 'address_name' => $address_name, ]; } return []; } // export for DEBUG function exportCsv(array $data) { $fp = fopen('/Users/inmac/Sites/FloatAdmin/adminsavvy/CRONS/transportation_mode_detection/trip.csv', 'a+'); $data = array_map(function ($el) { return [ $el['members_tracking_id'], $el['lng'], $el['lat'], $el['time'], round($el['duration'], 2), round($el['distance'], 2), round($el['avg_speed'], 2), $el['trip_detection_1'], $el['trip_detection_2'], round($el['cumulative_duration'], 2), round($el['cumulative_distance'], 2), round($el['cumulative_avg_speed'], 2), $el['transport_mode'], ]; }, $data); array_unshift($data, [ 'members_tracking_id', 'lng', 'lat', 'time', 'duration', 'distance', 'avg_speed', 'trip_detection_1', 'trip_detection_2', 'cumulative_duration', 'cumulative_distance', 'cumulative_avg_speed', 'transport_mode', ]); foreach ($data as $row) { fputcsv($fp, $row); } fclose($fp); }