cfgReadChar('database.host'); $db_name = $savvyext->cfgReadChar('database.name'); $db_user = $savvyext->cfgReadChar('database.user'); $db_pass = $savvyext->cfgReadChar('database.pass'); $db_port = $savvyext->cfgReadLong('database.port'); $connstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}"; $con = null; $worker = new GearmanWorker(); $worker->addServers("127.0.0.1:4730"); $worker->addFunction("fetchApiData", function (GearmanJob $job) { global $con, $connstr; $con = pg_connect($connstr) or die("Could not connect to server\n"); $workload = json_decode($job->workload(), true); execute($workload); echo "Waiting for job...\n"; }); while($worker->work()) { if ($worker->returnCode() != GEARMAN_SUCCESS) { echo "return_code: " . $worker->returnCode() . "\n"; break; } } /* ********************** FUNCTION ********************** */ function execute($workload) { $api_account = get_api_account(); if (empty($api_account)) { echo "The API key not found.\n"; exit(); } $data = array_merge(['workload' => $workload], ['api_account' => $api_account]); insert_weather($data); } function insert_weather(array $data) { global $con; $api_service_info = $data['api_account']; $workload = $data['workload']; $weather_api_key = trim($api_service_info['api_key']); $weather_url = trim($api_service_info['url']); $weather_service_name = trim($api_service_info['service_name']); /** workload sample *$workload = * [ * 'root_id' => 32232, * 'root_type' => 1, * 'min_max_date' => [ * 'min_date' => '2016-11-14', * 'max_date' => '2016-11-27' * ], * 'date_list' => [ * '2016-11-14', * '2016-11-18', * '2016-11-27', * ], * 'location_start_lat' => 34.43242342, * 'location_start_lng' => 36.43243342, * 'start_geometry' => '0101000020E6100000DA5E6633629A5EC0052C5ED152E44240', * 'timezone' => 'Asia/Ho_Chi_Minh' * ] * */ $location_start_lat = $workload['location_start_lat']; $location_start_lng = $workload['location_start_lng']; $min_max_date = $workload['min_max_date']; $date_list = $workload['date_list']; $start_geometry = $workload['start_geometry']; $root_id = $workload['root_id']; $root_type = $workload['root_type']; // tp = 1 => get every hour $url = "$weather_url?key=" . $weather_api_key . "&q=" . $location_start_lat . "," . $location_start_lng . "&tp=1&format=json&date=" . $min_max_date['min_date'] . "&enddate=" . $min_max_date['max_date']; // Collect data $response = get_weather_data_from_API($url); $response = format_weatheronline($response, $location_start_lat, $location_start_lng, $start_geometry); if (!empty($response)) { foreach ($date_list as $date) { // retrieve weather data in weather table, if exists => return records, else insert data that was retrieved from API response. $weather_records_for_specific_date = get_weather_data_by_coor_and_date_in_weather_table($date, $location_start_lat, $location_start_lng); if (empty($weather_records_for_specific_date)) { // insert weather data with specific date // With each date => we will insert 24 record (each record is one hour) $insert_query = generate_insert_query($response[$date]); // save hourly data into weather data $rs = pg_query($con, $insert_query); if (!$rs) { echo "Cannot execute query: $insert_query\n"; } $weather_records_for_specific_date = pg_fetch_all($rs); } // link hour to a trip (in trip_weather table) link_trip_and_weather($root_id, $root_type, $weather_records_for_specific_date, $date); echo "[" . date("Y-m-d H:i:s") . "] With DATE = ${date}, COOR($location_start_lat, $location_start_lng) and ROOT_ID = ${root_id}: Insert weather into weather table and link hour to a trip (in trip_weather table) \n"; } } } function get_weather_data_by_coor_and_date_in_weather_table(string $travel_date, float $location_start_lat, float $location_start_lng): array { global $con; $query = " SELECT id, time FROM weather WHERE date = '" . $travel_date . "' AND latitude = " . $location_start_lat . " AND longitude = " . $location_start_lng . " "; $rs = pg_query($con, $query); if (!$rs) { echo "Cannot execute query: $query\n"; } $result = []; while ($rs && $row = pg_fetch_object($rs)) { $result[] = [ 'id' => $row->id, 'time' => $row->time ]; } if (!empty($result)) { echo "[" . date("Y-m-d H:i:s") . "] Travel date = ${travel_date} and COOR(${location_start_lat}, ${location_start_lng}) are already existing in weather table.\n"; } return $result; } function link_trip_and_weather(int $root_id, int $root_type, array $weather_records_for_specific_date, string $date) { global $con; // find the trip/quote that has the same (root_id, root_type and travel_date) in trip_price_comparison table $find_trip_query = " (SELECT tpc.data_source_id, tpc.data_source, pitm.travel_date FROM trip_price_comparison tpc INNER JOIN parsedemail_item pitm ON pitm.id = tpc.data_source_id AND tpc.data_source = 1 AND root_id = ${root_id} AND root_type = ${root_type} WHERE pitm.travel_date IS NOT NULL AND to_char(pitm.travel_date, 'YYYY-MM-DD') = '${date}') UNION ( SELECT tpc.data_source_id, tpc.data_source, q.completed as travel_date FROM trip_price_comparison tpc INNER JOIN quotes q ON q.id = tpc.data_source_id AND tpc.data_source = 2 AND root_id = ${root_id} AND root_type = ${root_type} WHERE q.completed IS NOT NULL AND to_char(q.completed, 'YYYY-MM-DD') = '${date}') "; $rs = pg_query($con, $find_trip_query); if (!$rs) { echo "Cannot execute query: $find_trip_query\n"; } while ($row = pg_fetch_object($rs)) { $travel_date = $row->travel_date; // format: YYYY-MM-DD HH:II:SS // convert $travel_date format from YYYY-MM-DD HH:II:SS to YYYY-MM-DD HH:00:00 $travel_date = substr($travel_date, 0, 13) . ':00:00'; // purpose: to compare travel date with weather date (YYYY-MM-DD HH:00:00) $key = array_search($travel_date, array_column($weather_records_for_specific_date, 'time')); if ($key === FALSE) { continue; } $weather_id = $weather_records_for_specific_date[$key]['id']; $insert_query = "INSERT INTO trip_weather(data_source_id, data_source, weather_id) VALUES ('" . pg_escape_string($row->data_source_id) . "', '" . pg_escape_string($row->data_source) . "', '" . pg_escape_string($weather_id) . "') ON CONFLICT (data_source_id,data_source) DO NOTHING;"; $insert_rs = pg_query($con, $insert_query); if (!$insert_rs) { echo "Cannot execute query: $insert_query\n"; } } } function get_weather_data_from_API($url) { // Get cURL resource $curl = curl_init(); // Set some options - we are passing in a useragent too here curl_setopt_array($curl, [ CURLOPT_RETURNTRANSFER => 1, CURLOPT_URL => $url, CURLOPT_HTTPHEADER, [ 'content-type: application/json', ], ]); // Send the request & save response to $result $result = curl_exec($curl); // Check HTTP status code switch ($http_code = curl_getinfo($curl, CURLINFO_HTTP_CODE)) { case 200: break; case 429: echo "The API key has reached calls per day allowed limit. \n"; echo $result; exit; // terminate worker default: echo "Something went wrong at call API with url: ${url}.\n"; echo $result; exit; // terminate worker } // Close request to clear up some resources curl_close($curl); return json_decode($result); } /** * Get the first api count * @param none * @return array */ function get_api_account() { global $con; $query = "SELECT id, service_name, api_key, url FROM weather_services WHERE active = TRUE LIMIT 1"; $rs = pg_query($con, $query); if (!$rs) { echo "Cannot execute query: $query\n"; } return pg_fetch_all($rs) ? pg_fetch_all($rs)[0] : []; } function format_weatheronline($data, $lat, $long, $geometry) { $weather = $data->data->weather; $response_data = []; foreach ($weather as $weather_by_date) { foreach ($weather_by_date->hourly as $hourly) { // becuase response time format: hmm (1hour: 100) -> we will convert to YYYY-MM-DD H:i:s $hour = $hourly->time / 100; $date = $weather_by_date->date . " ${hour}:00:00"; $date = DateTime::createFromFormat('Y-m-d G:i:s', $date); $date = $date->format('Y-m-d H:i:s'); $response_data[$weather_by_date->date][] = [ 'time' => $date, 'tempC' => $hourly->tempC, 'avgtempC' => $weather_by_date->avgtempC, 'totalSnow_cm' => $weather_by_date->totalSnow_cm, 'windspeedKmph' => $hourly->windspeedKmph, 'weatherCode' => $hourly->weatherCode, 'precipMM' => $hourly->precipMM, 'humidity' => $hourly->humidity, 'visibility' => $hourly->visibility, 'pressure' => $hourly->pressure, 'HeatIndexC' => $hourly->HeatIndexC, 'WindChillC' => $hourly->WindChillC, 'WindGustKmph' => $hourly->WindGustKmph, 'FeelsLikeC' => $hourly->FeelsLikeC, 'date' => $weather_by_date->date, 'location_start_lat' => $lat, 'location_start_lng' => $long, 'start_geometry' => $geometry, 'dewpoint' => 'null', 'precipitation' => 'null', 'precipitation_3' => 'null', 'precipitation_6' => 'null', 'winddirection' => 'null', 'condition' => 'null', ]; } } return $response_data; } function generate_insert_query($data) { $value = []; foreach ($data as $element) { $value[] = "( " . $element['tempC'] . " , " . $element['avgtempC'] . " , " . $element['totalSnow_cm'] . " , '" . $element['windspeedKmph'] . "' , '" . $element['weatherCode'] . "' , '" . $element['precipMM'] . "' , '" . $element['humidity'] . "' , '" . $element['visibility'] . "' , '" . $element['pressure'] . "' , '" . $element['HeatIndexC'] . "' , '" . $element['WindChillC'] . "' , '" . $element['WindGustKmph'] . "' , '" . $element['FeelsLikeC'] . "' , " . $element['location_start_lat'] . " , " . $element['location_start_lng'] . " , '" . $element['start_geometry'] . "' , '" . $element['date'] . "', " . $element['dewpoint'] . " , " . $element['precipitation'] . " , " . $element['precipitation_3'] . " , " . $element['precipitation_6'] . " , " . $element['winddirection'] . " , " . $element['condition'] . " , '" . $element['time'] . "' )"; } $query = "INSERT INTO weather( temp_c, avg_temp_c, total_snow_cm, wind_speed_kmph, weather_code, precip_mm, humidity, visibility, pressure, heat_index_c, wind_chill_c, wind_gust_kmph, feels_like_c, latitude, longitude, geometry, date, dewpoint, precipitation, precipitation_3, precipitation_6, winddirection, condition, time ) VALUES" . implode(',', $value) . ' RETURNING id, time'; return $query; }