cfgReadChar('database.host'); $db_name = $savvyext->cfgReadChar('database.name'); $db_user = $savvyext->cfgReadChar('database.user'); $db_pass = $savvyext->cfgReadChar('database.pass'); $db_port = $savvyext->cfgReadLong('database.port'); $connstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}"; $con = null; $worker = new GearmanWorker(); $worker->addServers("127.0.0.1:4730"); $worker->addFunction("processExistingWeatherDataWorker", function (GearmanJob $job) { global $con, $connstr; $con = pg_connect($connstr) or die("Could not connect to server\n"); $workload = json_decode($job->workload(), true); execute($workload); echo "Waiting for job...\n"; }); while($worker->work()) { if ($worker->returnCode() != GEARMAN_SUCCESS) { echo "return_code: " . $worker->returnCode() . "\n"; break; } } /* ********************** FUNCTION ********************** */ function execute($workload) { $weather_records_for_specific_date = get_weather_data_by_coor_and_date_in_weather_table($workload['travel_date'], $workload['location_start_lat'], $workload['location_start_lng']); if (!empty($weather_records_for_specific_date)) { insert_trip_weather_with_existing_weather_data($workload['root_id'], $workload['root_type'], $weather_records_for_specific_date, $workload['travel_date']); } } function get_weather_data_by_coor_and_date_in_weather_table(string $travel_date, float $location_start_lat, float $location_start_lng): array { global $con; $query = " SELECT id, time FROM weather WHERE date = '" . $travel_date . "' AND latitude = " . $location_start_lat . " AND longitude = " . $location_start_lng . " "; $rs = pg_query($con, $query); if (!$rs) { echo "Cannot execute query: $query\n"; } $result = []; while ($rs && $row = pg_fetch_object($rs)) { $result[] = [ 'id' => $row->id, 'time' => $row->time ]; } return $result; } function insert_trip_weather_with_existing_weather_data(int $root_id, int $root_type, array $weather_records_for_specific_date, string $date) { global $con; // find the trip/quote that has the same (root_id, root_type and travel_date) in trip_price_comparison table $find_trip_query = " (SELECT tpc.data_source_id, tpc.data_source, pitm.travel_date FROM trip_price_comparison tpc INNER JOIN parsedemail_item pitm ON pitm.id = tpc.data_source_id AND tpc.data_source = 1 AND root_id = ${root_id} AND root_type = ${root_type} WHERE pitm.travel_date IS NOT NULL AND to_char(pitm.travel_date, 'YYYY-MM-DD') = '${date}') UNION ( SELECT tpc.data_source_id, tpc.data_source, q.completed as travel_date FROM trip_price_comparison tpc INNER JOIN quotes q ON q.id = tpc.data_source_id AND tpc.data_source = 2 AND root_id = ${root_id} AND root_type = ${root_type} WHERE q.completed IS NOT NULL AND to_char(q.completed, 'YYYY-MM-DD') = '${date}') "; $rs = pg_query($con, $find_trip_query); if (!$rs) { echo "Cannot execute query: $find_trip_query\n"; } echo "[" . date("Y-m-d H:i:s") . "] Processing trip_weather with root_id = ${root_id}, root_type = ${root_type}, date = ${date}\n"; while ($rs && $row = pg_fetch_object($rs)) { $travel_date = $row->travel_date; // format: YYYY-MM-DD HH:II:SS // convert $travel_date format from YYYY-MM-DD HH:II:SS to YYYY-MM-DD HH:00:00 $travel_date = substr($travel_date, 0, 13) . ':00:00'; // purpose: to compare travel date with weather date (YYYY-MM-DD HH:00:00) $key = array_search($travel_date, array_column($weather_records_for_specific_date, 'time')); if ($key === FALSE) { continue; } $weather_id = $weather_records_for_specific_date[$key]['id']; $insert_query = "INSERT INTO trip_weather(data_source_id, data_source, weather_id) VALUES ('" . pg_escape_string($row->data_source_id) . "', '" . pg_escape_string($row->data_source) . "', '" . pg_escape_string($weather_id) . "') ON CONFLICT (data_source_id,data_source) DO NOTHING;"; $insert_rs = pg_query($con, $insert_query); } }