Files
FloatBackOfffice/CRONS/weather/update_weather_data_worker.php
dev-chiefworks f76abffdcd first commit
2022-05-31 16:21:53 -04:00

394 lines
12 KiB
PHP

<?php
require '../../backend.php';
$db_host = $savvyext->cfgReadChar('database.host');
$db_name = $savvyext->cfgReadChar('database.name');
$db_user = $savvyext->cfgReadChar('database.user');
$db_pass = $savvyext->cfgReadChar('database.pass');
$db_port = $savvyext->cfgReadLong('database.port');
$connstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
$con = null;
$worker = new GearmanWorker();
$worker->addServers("127.0.0.1:4730");
$worker->addFunction("fetchApiData", function (GearmanJob $job) {
global $con, $connstr;
$con = pg_connect($connstr) or die("Could not connect to server\n");
$workload = json_decode($job->workload(), true);
execute($workload);
echo "Waiting for job...\n";
});
while($worker->work())
{
if ($worker->returnCode() != GEARMAN_SUCCESS)
{
echo "return_code: " . $worker->returnCode() . "\n";
break;
}
}
/* ********************** FUNCTION ********************** */
function execute($workload)
{
$api_account = get_api_account();
if (empty($api_account)) {
echo "The API key not found.\n";
exit();
}
$data = array_merge(['workload' => $workload], ['api_account' => $api_account]);
insert_weather($data);
}
function insert_weather(array $data)
{
global $con;
$api_service_info = $data['api_account'];
$workload = $data['workload'];
$weather_api_key = trim($api_service_info['api_key']);
$weather_url = trim($api_service_info['url']);
$weather_service_name = trim($api_service_info['service_name']);
/** workload sample
*$workload =
* [
* 'root_id' => 32232,
* 'root_type' => 1,
* 'min_max_date' => [
* 'min_date' => '2016-11-14',
* 'max_date' => '2016-11-27'
* ],
* 'date_list' => [
* '2016-11-14',
* '2016-11-18',
* '2016-11-27',
* ],
* 'location_start_lat' => 34.43242342,
* 'location_start_lng' => 36.43243342,
* 'start_geometry' => '0101000020E6100000DA5E6633629A5EC0052C5ED152E44240',
* 'timezone' => 'Asia/Ho_Chi_Minh'
* ]
*
*/
$location_start_lat = $workload['location_start_lat'];
$location_start_lng = $workload['location_start_lng'];
$min_max_date = $workload['min_max_date'];
$date_list = $workload['date_list'];
$start_geometry = $workload['start_geometry'];
$root_id = $workload['root_id'];
$root_type = $workload['root_type'];
// tp = 1 => get every hour
$url = "$weather_url?key=" . $weather_api_key . "&q=" . $location_start_lat . "," . $location_start_lng . "&tp=1&format=json&date=" . $min_max_date['min_date'] . "&enddate=" . $min_max_date['max_date'];
// Collect data
$response = get_weather_data_from_API($url);
$response = format_weatheronline($response, $location_start_lat, $location_start_lng, $start_geometry);
if (!empty($response)) {
foreach ($date_list as $date) {
// retrieve weather data in weather table, if exists => return records, else insert data that was retrieved from API response.
$weather_records_for_specific_date = get_weather_data_by_coor_and_date_in_weather_table($date, $location_start_lat, $location_start_lng);
if (empty($weather_records_for_specific_date)) {
// insert weather data with specific date
// With each date => we will insert 24 record (each record is one hour)
$insert_query = generate_insert_query($response[$date]);
// save hourly data into weather data
$rs = pg_query($con, $insert_query);
if (!$rs) {
echo "Cannot execute query: $insert_query\n";
}
$weather_records_for_specific_date = pg_fetch_all($rs);
}
// link hour to a trip (in trip_weather table)
link_trip_and_weather($root_id, $root_type, $weather_records_for_specific_date, $date);
echo "[" . date("Y-m-d H:i:s") . "] With DATE = ${date}, COOR($location_start_lat, $location_start_lng) and ROOT_ID = ${root_id}: Insert weather into weather table
and link hour to a trip (in trip_weather table) \n";
}
}
}
function get_weather_data_by_coor_and_date_in_weather_table(string $travel_date, float $location_start_lat, float $location_start_lng): array
{
global $con;
$query = "
SELECT
id, time
FROM
weather
WHERE
date = '" . $travel_date . "'
AND latitude = " . $location_start_lat . "
AND longitude = " . $location_start_lng . "
";
$rs = pg_query($con, $query);
if (!$rs) {
echo "Cannot execute query: $query\n";
}
$result = [];
while ($rs && $row = pg_fetch_object($rs)) {
$result[] = [
'id' => $row->id,
'time' => $row->time
];
}
if (!empty($result)) {
echo "[" . date("Y-m-d H:i:s") . "] Travel date = ${travel_date} and COOR(${location_start_lat}, ${location_start_lng}) are already existing in weather table.\n";
}
return $result;
}
function link_trip_and_weather(int $root_id, int $root_type, array $weather_records_for_specific_date, string $date)
{
global $con;
// find the trip/quote that has the same (root_id, root_type and travel_date) in trip_price_comparison table
$find_trip_query = "
(SELECT
tpc.data_source_id,
tpc.data_source,
pitm.travel_date
FROM
trip_price_comparison tpc
INNER JOIN parsedemail_item pitm ON pitm.id = tpc.data_source_id
AND tpc.data_source = 1
AND root_id = ${root_id}
AND root_type = ${root_type}
WHERE
pitm.travel_date IS NOT NULL
AND to_char(pitm.travel_date, 'YYYY-MM-DD') = '${date}')
UNION (
SELECT
tpc.data_source_id,
tpc.data_source,
q.completed as travel_date
FROM
trip_price_comparison tpc
INNER JOIN quotes q ON q.id = tpc.data_source_id
AND tpc.data_source = 2
AND root_id = ${root_id}
AND root_type = ${root_type}
WHERE
q.completed IS NOT NULL
AND to_char(q.completed, 'YYYY-MM-DD') = '${date}')
";
$rs = pg_query($con, $find_trip_query);
if (!$rs) {
echo "Cannot execute query: $find_trip_query\n";
}
while ($row = pg_fetch_object($rs)) {
$travel_date = $row->travel_date; // format: YYYY-MM-DD HH:II:SS
// convert $travel_date format from YYYY-MM-DD HH:II:SS to YYYY-MM-DD HH:00:00
$travel_date = substr($travel_date, 0, 13) . ':00:00';
// purpose: to compare travel date with weather date (YYYY-MM-DD HH:00:00)
$key = array_search($travel_date, array_column($weather_records_for_specific_date, 'time'));
if ($key === FALSE) {
continue;
}
$weather_id = $weather_records_for_specific_date[$key]['id'];
$insert_query = "INSERT INTO trip_weather(data_source_id, data_source, weather_id) VALUES
('" . pg_escape_string($row->data_source_id) . "', '" . pg_escape_string($row->data_source) . "', '" . pg_escape_string($weather_id) . "')
ON CONFLICT (data_source_id,data_source) DO NOTHING;";
$insert_rs = pg_query($con, $insert_query);
if (!$insert_rs) {
echo "Cannot execute query: $insert_query\n";
}
}
}
function get_weather_data_from_API($url)
{
// Get cURL resource
$curl = curl_init();
// Set some options - we are passing in a useragent too here
curl_setopt_array($curl, [
CURLOPT_RETURNTRANSFER => 1,
CURLOPT_URL => $url,
CURLOPT_HTTPHEADER, [
'content-type: application/json',
],
]);
// Send the request & save response to $result
$result = curl_exec($curl);
// Check HTTP status code
switch ($http_code = curl_getinfo($curl, CURLINFO_HTTP_CODE)) {
case 200:
break;
case 429:
echo "The API key has reached calls per day allowed limit. \n";
echo $result;
exit; // terminate worker
default:
echo "Something went wrong at call API with url: ${url}.\n";
echo $result;
exit; // terminate worker
}
// Close request to clear up some resources
curl_close($curl);
return json_decode($result);
}
/**
* Get the first api count
* @param none
* @return array
*/
function get_api_account()
{
global $con;
$query = "SELECT id, service_name, api_key, url
FROM weather_services
WHERE
active = TRUE
LIMIT 1";
$rs = pg_query($con, $query);
if (!$rs) {
echo "Cannot execute query: $query\n";
}
return pg_fetch_all($rs) ? pg_fetch_all($rs)[0] : [];
}
function format_weatheronline($data, $lat, $long, $geometry)
{
$weather = $data->data->weather;
$response_data = [];
foreach ($weather as $weather_by_date) {
foreach ($weather_by_date->hourly as $hourly) {
// becuase response time format: hmm (1hour: 100) -> we will convert to YYYY-MM-DD H:i:s
$hour = $hourly->time / 100;
$date = $weather_by_date->date . " ${hour}:00:00";
$date = DateTime::createFromFormat('Y-m-d G:i:s', $date);
$date = $date->format('Y-m-d H:i:s');
$response_data[$weather_by_date->date][] = [
'time' => $date,
'tempC' => $hourly->tempC,
'avgtempC' => $weather_by_date->avgtempC,
'totalSnow_cm' => $weather_by_date->totalSnow_cm,
'windspeedKmph' => $hourly->windspeedKmph,
'weatherCode' => $hourly->weatherCode,
'precipMM' => $hourly->precipMM,
'humidity' => $hourly->humidity,
'visibility' => $hourly->visibility,
'pressure' => $hourly->pressure,
'HeatIndexC' => $hourly->HeatIndexC,
'WindChillC' => $hourly->WindChillC,
'WindGustKmph' => $hourly->WindGustKmph,
'FeelsLikeC' => $hourly->FeelsLikeC,
'date' => $weather_by_date->date,
'location_start_lat' => $lat,
'location_start_lng' => $long,
'start_geometry' => $geometry,
'dewpoint' => 'null',
'precipitation' => 'null',
'precipitation_3' => 'null',
'precipitation_6' => 'null',
'winddirection' => 'null',
'condition' => 'null',
];
}
}
return $response_data;
}
function generate_insert_query($data)
{
$value = [];
foreach ($data as $element) {
$value[] =
"(
" . $element['tempC'] . " ,
" . $element['avgtempC'] . " ,
" . $element['totalSnow_cm'] . " ,
'" . $element['windspeedKmph'] . "' ,
'" . $element['weatherCode'] . "' ,
'" . $element['precipMM'] . "' ,
'" . $element['humidity'] . "' ,
'" . $element['visibility'] . "' ,
'" . $element['pressure'] . "' ,
'" . $element['HeatIndexC'] . "' ,
'" . $element['WindChillC'] . "' ,
'" . $element['WindGustKmph'] . "' ,
'" . $element['FeelsLikeC'] . "' ,
" . $element['location_start_lat'] . " ,
" . $element['location_start_lng'] . " ,
'" . $element['start_geometry'] . "' ,
'" . $element['date'] . "',
" . $element['dewpoint'] . " ,
" . $element['precipitation'] . " ,
" . $element['precipitation_3'] . " ,
" . $element['precipitation_6'] . " ,
" . $element['winddirection'] . " ,
" . $element['condition'] . " ,
'" . $element['time'] . "'
)";
}
$query = "INSERT INTO weather(
temp_c,
avg_temp_c,
total_snow_cm,
wind_speed_kmph,
weather_code,
precip_mm,
humidity,
visibility,
pressure,
heat_index_c,
wind_chill_c,
wind_gust_kmph,
feels_like_c,
latitude,
longitude,
geometry,
date,
dewpoint,
precipitation,
precipitation_3,
precipitation_6,
winddirection,
condition,
time
) VALUES" . implode(',', $value) . ' RETURNING id, time';
return $query;
}