cfgReadChar('system.oauth2_token'); $db_host = $savvyext->cfgReadChar('database.host'); $db_name = $savvyext->cfgReadChar('database.name'); $db_user = $savvyext->cfgReadChar('database.user'); $db_pass = $savvyext->cfgReadChar('database.pass'); $db_port = $savvyext->cfgReadLong('database.port'); $connstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}"; $con = pg_connect($connstr); $db_host = $savvyext->cfgReadChar('database_replica.host'); $db_name = $savvyext->cfgReadChar('database_replica.name'); $db_user = $savvyext->cfgReadChar('database_replica.user'); $db_pass = $savvyext->cfgReadChar('database_replica.pass'); $db_port = $savvyext->cfgReadLong('database_replica.port'); $readOnlyReplicaConnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}"; $readOnlyReplicaConn = pg_connect($readOnlyReplicaConnstr); if ($con===FALSE || $readOnlyReplicaConn===FALSE) { unlock_pid_file($lock_file); die ("Could not connect to server\n"); } $rate = 1.1; $numberOfDays = 365; // This is handled by update cheapest data job now... $city_id = 1; $tz = 'Asia/Singapore'; $transport_providers = [3, 4, 5]; $min_distance = 0.1; //0.1km $min_rate = 0.2; //20 cents // Load max IDs $q = "SELECT max(id) FROM parsedemail_item WHERE base_cost>0 AND dup_id IS NULL AND private='f' AND booking='f' AND transport_provider_id IS NOT NULL"; $r = pg_query($readOnlyReplicaConn, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { $max_parsedemail_item_id = $f[0]; } $q = "SELECT max(id) FROM quotes WHERE cost>0"; $r = pg_query($readOnlyReplicaConn, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { $max_quote_id = $f[0]; } // Load areas $areas = []; $q = "SELECT * FROM geofence_area WHERE city_id=${city_id}"; $r = pg_query($readOnlyReplicaConn, $q); if ($r && pg_num_rows($r)) { while ($f=pg_fetch_assoc($r)) { $areas[$f["id"]] = $f; } } // Load previous data $average_quotes = []; foreach ($areas as $f) { $average_quotes[$f["id"]] = load_average_quotes_for_area($f,$areas,$transport_providers); } // Start processing foreach ($average_quotes as $area_id=>$average_quote) { echo "[".date("Y-m-d H:i:s")."] Processing area ID #${area_id}\n"; process_area_average_quotes($average_quote,$areas,$area_id, $min_distance,$min_rate); } /*********************************** Function ***********************************/ function load_average_quotes_for_area($area,$areas,$transport_providers) { global $con, $readOnlyReplicaConn; $average_quotes = []; // Pre-fill the initial data for ($i=0;$i<24;$i++) { foreach ($areas as $item) { foreach ($transport_providers as $transport_provider_id) { // unique(area_start_id,area_end_id,transport_provider_id,hour) $key = $area["id"]."_".$item["id"]; if (!array_key_exists($key,$average_quotes)) { $average_quotes[$key] = []; // initialize } if (!array_key_exists($transport_provider_id,$average_quotes[$key])) { $average_quotes[$key][$transport_provider_id] = []; } $average_quotes[$key][$transport_provider_id][$i] = [ "id" => 0, "area_start_id" => $area["id"], "area_end_id" => $item["id"], "transport_provider_id" => $transport_provider_id, "average_cost" => 0, "average_total" => 0, "average_count" => 0, "hour" => $i, /* "last_updated" => NULL, */ "last_quotes_id" => 0, "last_parsedemail_item_id" => 0 ]; } } } // Load the existing data from the DB $q = "SELECT * FROM geofence_area_average_quotes WHERE area_start_id=".$area["id"]; $r = pg_query($readOnlyReplicaConn, $q); if ($r && pg_num_rows($r)) { while ($f=pg_fetch_assoc($r)) { // unique(area_start_id,area_end_id,transport_provider_id,hour) $key = $f["area_start_id"]."_".$f["area_end_id"]; if (!array_key_exists($key,$average_quotes)) { $average_quotes[$key] = []; } if (!array_key_exists($f["transport_provider_id"],$average_quotes[$key])) { $average_quotes[$key][$f["transport_provider_id"]] = []; } $average_quotes[$key][$f["transport_provider_id"]][$f["hour"]] = $f; } } return $average_quotes; } function process_area_average_quotes($average_quote,$areas,$area_id,$min_distance=0.1,$min_rate=0.2) { global $con, $readOnlyReplicaConn, $max_parsedemail_item_id, $max_quote_id, $numberOfDays, $tz, $transport_providers; // $average_quote[$key][$transport_provider_id][$hour] foreach ($average_quote as $key=>$vals) { echo "[".date("Y-m-d H:i:s")."] Processing key{from,to}=${key}\n"; // Get the first item foreach ($vals as $val) { break; // no matter which... } $item = $val[0]; // first entry if ($max_parsedemail_item_id<=$item["last_parsedemail_item_id"] && $max_quote_id<=$item["last_quotes_id"]) { echo "[".date("Y-m-d H:i:s")."] No new records to compare for the key=${key}\n"; continue; } $from_area = $areas[$item["area_start_id"]]; $to_area = $areas[$item["area_end_id"]]; try { // We assume quotes has UTC timestamp without timezone $q = "(SELECT a.cost, a.transport_provider_id, "; $q.= " date_part('hour',(a.travel_date at time zone 'utc' at time zone '${tz}')) AS hour FROM quotes a "; $q.= " JOIN address AS b ON (b.id = a.location_start_id) "; $q.= " JOIN address AS c ON (c.id = a.location_end_id) "; $q.= " WHERE a.cost>0 AND a.travel_date BETWEEN NOW() - INTERVAL '" . $numberOfDays . " days' AND NOW()"; $q.= " AND (b.country='".$from_area["country"]."' OR c.country='".$to_area["country"]."') "; $q.= " AND a.transport_provider_id IN (".implode(",",$transport_providers).")"; //echo $q; unlock_pid_file($lock_file); exit(); $q.= " AND a.id>".$item["last_quotes_id"]." AND a.id<=".$max_quote_id; $q.= process_area_filter($from_area,$to_area); $q.= ") UNION ALL ("; // We assume parsedemail_item has local timestamp without timezone $q.= "SELECT a.base_cost, a.transport_provider_id, date_part('hour', a.travel_date) AS hour FROM parsedemail_item a "; $q.= " JOIN address AS b ON (b.id = a.location_start_id) "; $q.= " JOIN address AS c ON (c.id = a.location_end_id) "; $q.= " WHERE a.base_cost>0 AND a.travel_date BETWEEN NOW() - INTERVAL '" . $numberOfDays . " days' AND NOW()"; $q.= " AND a.dup_id IS NULL AND a.private='f' AND a.booking='f' AND a.transport_provider_id IS NOT NULL "; $q.= " AND (b.country='".$from_area["country"]."' OR c.country='".$to_area["country"]."') "; $q.= " AND a.transport_provider_id IN (".implode(",",$transport_providers).")"; $q.= " AND a.id>".$item["last_parsedemail_item_id"]." AND a.id<=".$max_parsedemail_item_id; $q.= " AND (distance IS NULL OR distance<".$min_distance." OR base_cost/distance>".$min_rate.")"; $q.= process_area_filter($from_area,$to_area); $q.= ")"; // Master query $mq = "SELECT ROUND(AVG(a.cost), 2) as average_cost, ROUND(SUM(a.cost),2) AS average_total, COUNT(*) AS average_count, a.transport_provider_id, a.hour "; $mq.= " FROM (${q}) AS a GROUP BY a.transport_provider_id,a.hour"; if ( ($item["area_start_id"]==23 and $item["area_end_id"]==9) || ($item["area_start_id"]==12 and $item["area_end_id"]==4) || ($item["area_start_id"]==14 and $item["area_end_id"]==16) || ($item["area_start_id"]== 9 and $item["area_end_id"]==14) || ($item["area_start_id"]==23 and $item["area_end_id"]==22) || ($item["area_start_id"]==17 and $item["area_end_id"]==7) || ($item["area_start_id"]==22 and $item["area_end_id"]==22)) { echo "[".date("Y-m-d H:i:s")."] Area '".$item["area_start_id"]."' to area '".$item["area_end_id"]."' \n"; echo $mq.";\n\n"; } $r = pg_query($readOnlyReplicaConn, $mq); if ($r && pg_num_rows($r)) { while ($f=pg_fetch_assoc($r)) { // unique(area_start_id,area_end_id,transport_provider_id,hour) $average_cost = $average_quote[$key][$f["transport_provider_id"]][$f["hour"]]["average_cost"]; $average_total = $average_quote[$key][$f["transport_provider_id"]][$f["hour"]]["average_total"]; $average_count = $average_quote[$key][$f["transport_provider_id"]][$f["hour"]]["average_count"]; $new_average_cost = $f["average_cost"]; $new_average_count = $average_count + $f["average_count"]; $new_average_total = $average_total + $f["average_total"]; if ($new_average_count>0) { $new_average_cost = sprintf("%0.02f",$new_average_total/$new_average_count); } $average_quote[$key][$f["transport_provider_id"]][$f["hour"]]["average_cost"] = $new_average_cost; $average_quote[$key][$f["transport_provider_id"]][$f["hour"]]["average_total"] = $new_average_total; $average_quote[$key][$f["transport_provider_id"]][$f["hour"]]["average_count"] = $new_average_count; $average_quote[$key][$f["transport_provider_id"]][$f["hour"]]["last_quotes_id"] = $max_quote_id; $average_quote[$key][$f["transport_provider_id"]][$f["hour"]]["last_parsedemail_item_id"] = $max_parsedemail_item_id; $average_quote[$key][$f["transport_provider_id"]][$f["hour"]]["id"] = save_or_update($average_quote[$key][$f["transport_provider_id"]][$f["hour"]]); } } else { $err = pg_last_error(); if ($err === "") { echo "[".date("Y-m-d H:i:s")."] WARNING(${key}): No trips found between areas?\n"; } else { echo "[".date("Y-m-d H:i:s")."] ERROR(${key}): ${err}\n"; echo "[".date("Y-m-d H:i:s")."] ${q}\n"; } } } catch (Exception $e) { echo "[".date("Y-m-d H:i:s")."] ERROR(${key}): ".$e->getMessage()."\n"; } //unlock_pid_file($lock_file); exit(); // DEBUG } } function save_or_update($data) { global $con; $qb = "average_cost=".sprintf("%0.02f",$data["average_cost"]); $qb.= ",average_total=".sprintf("%0.02f",$data["average_total"]); $qb.= ",average_count=".((int)$data["average_count"]); $qb.= ",last_updated=NOW()"; $qb.= ",last_quotes_id=".((int)$data["last_quotes_id"]); $qb.= ",last_parsedemail_item_id=".((int)$data["last_parsedemail_item_id"]); if ($data["id"]>0) { // Update $q = "UPDATE geofence_area_average_quotes SET"; $q.= " ".$qb." "; $q.= " WHERE id=".$data["id"]." RETURNING id"; } else { // Save $fields = ['area_start_id','area_end_id','transport_provider_id','average_cost','average_total','average_count','hour','last_quotes_id','last_parsedemail_item_id']; $values = []; foreach ($fields as $field) { $values[] = $data[$field]; } $q = "INSERT INTO geofence_area_average_quotes (".implode(",",$fields).") VALUES(".implode(",",$values).") "; $q.= " ON CONFLICT (area_start_id, area_end_id, transport_provider_id, hour) DO UPDATE SET ${qb} RETURNING id"; } //echo $q . "\n"; //* $r = pg_query($con, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { return $f[0]; // ID } else { echo "[".date("Y-m-d H:i:s")."] ERROR: ${q}\n"; }//*/ return NULL; } /* CREATE TABLE geofence_area_average_quotes ( id bigserial, area_start_id int references geofence_area(id), area_end_id int references geofence_area(id), transport_provider_id int references transport_providers(id), average_cost numeric, average_total numeric, average_count int, hour smallint not null, last_updated timestamp default now(), last_quotes_id bigint, last_parsedemail_item_id bigint, primary key(id), unique(area_start_id,area_end_id,transport_provider_id,hour) ); */ function process_area_filter($from_area,$to_area) { $from_boundaries = json_decode($from_area["boundaries"],true); $to_boundaries = json_decode($to_area["boundaries"],true); $q = ""; if ($from_area['type'] === 'radius') { $q.= process_area_filter_radius('b',$from_area,$from_boundaries); } if ($to_area['type'] === 'radius') { $q.= process_area_filter_radius('c',$to_area,$to_boundaries); } if ($from_area['type'] === 'postal') { $q.= process_area_filter_postal('b',$from_area,$from_boundaries); //$q.= process_area_filter_polygon('b',$from_area,$from_boundaries); } if ($to_area['type'] === 'postal') { $q.= process_area_filter_postal('c',$to_area,$to_boundaries); //$q.= process_area_filter_polygon('c',$to_area,$to_boundaries); } if ($from_area['type'] === 'polygon') { $q.= process_area_filter_polygon('b',$from_area,$from_boundaries); } if ($to_area['type'] === 'polygon') { $q.= process_area_filter_polygon('c',$to_area,$to_boundaries); } return $q; } function process_area_filter_radius($ref,$area,$boundaries) { if (is_array($boundaries) && array_key_exists("radius",$boundaries) && $boundaries["radius"]>0) { return " AND ST_DWithin('" . $area['location'] . "', ${ref}.geometry, " . $boundaries["radius"] . ")"; } throw new Exception("Missing radius in boundaries field"); } function process_area_filter_postal($ref,$area,$boundaries) { if (is_array($boundaries) && array_key_exists("postal_code",$boundaries) && is_array($boundaries["postal_code"]) && count($boundaries["postal_code"])>0) { $q = ""; foreach ($boundaries["postal_code"] as $code) { if ($q!="") $q.= " OR "; $q.= " LEFT(${ref}.postal,".strlen($code).")='${code}' "; } return " AND (".$q.")"; } throw new Exception("Missing postal_code in boundaries field"); } function process_area_filter_polygon($ref,$area,$boundaries) { if (is_array($boundaries) && array_key_exists("polygon",$boundaries) && is_array($boundaries["polygon"]) and count($boundaries["polygon"])>0) { $polygonCoords = []; foreach ($boundaries["polygon"] as $coord) { $polygonCoords[] = "$coord[0] $coord[1]"; } $polygonCoordsString = implode(",", $polygonCoords); return " AND ST_Contains(ST_GeomFromText('POLYGON((${polygonCoordsString}))', 4326), ${ref}.geometry::geometry) "; } throw new Exception("Missing radius in boundaries field"); } pg_close($con); pg_close($readOnlyReplicaConn); unlock_pid_file($lock_file); echo "[".date("Y-m-d H:i:s")."] geofence_area_average_quotes job complete.\n"; ?>