cfgReadChar('system.oauth2_token'); $db_host = $savvyext->cfgReadChar('database.host'); $db_name = $savvyext->cfgReadChar('database.name'); $db_user = $savvyext->cfgReadChar('database.user'); $db_pass = $savvyext->cfgReadChar('database.pass'); $db_port = $savvyext->cfgReadLong('database.port'); $connstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}"; $con = pg_connect($connstr); // or die ("Could not connect to server\n"); $db_host = $savvyext->cfgReadChar('database_replica.host'); $db_name = $savvyext->cfgReadChar('database_replica.name'); $db_user = $savvyext->cfgReadChar('database_replica.user'); $db_pass = $savvyext->cfgReadChar('database_replica.pass'); $db_port = $savvyext->cfgReadLong('database_replica.port'); $readOnlyReplicaConnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}"; $readOnlyReplicaConn = pg_connect($readOnlyReplicaConnstr); $radius = 100; $numberOfDays = 365; /* // Init INSERT INTO global_settings (key,description,value) VALUES ('last_compared_parsedemail_item','',0); INSERT INTO global_settings (key,description,value) VALUES ('last_compared_quote','',0); // Reset TRUNCATE TABLE trip_price_comparison; UPDATE global_settings SET value='0' WHERE key='last_compared_parsedemail_item'; UPDATE global_settings SET value='0' WHERE key='last_compared_quote'; */ // 1. Box the data set (select min id from trip_price_comparison and select max id on parsedemail_item and quotes) $min_parsedemail_item_id = get_global_setting('last_compared_parsedemail_item',0); $min_quote_id = get_global_setting('last_compared_quote',0); $max_parsedemail_item_id = get_max_id('parsedemail_item'); $max_quote_id = get_max_id('quotes'); // 1.1. Select all the trips order by date from the oldest to the newest inside the boxed set $q = "SELECT a.*"; $q.= ", a1.latitude AS location_start_lat, a1.longitude AS location_start_lng "; $q.= ", a2.latitude AS location_end_lat, a2.longitude AS location_end_lng "; $q.= " FROM union_trip_and_quote_view_table a "; $q.= " LEFT JOIN address a1 ON a1.id = a.location_start_id "; $q.= " LEFT JOIN address a2 ON a2.id = a.location_end_id "; $q.= " WHERE ((a.data_source=1 AND a.data_source_id>${min_parsedemail_item_id} AND a.data_source_id<=${max_parsedemail_item_id})"; $q.= " OR (a.data_source=2 AND a.data_source_id>${min_quote_id} AND a.data_source_id<=${max_quote_id}))"; $q.= " AND a.cost>0 ORDER BY a.travel_date ASC"; $r = pg_query($readOnlyReplicaConn, $q); $i = 1; // 2. Iterate through the trips if ($r && pg_num_rows($r)) { $total = pg_num_rows($r); echo "[".date("Y-m-d H:i:s")."] Processing ${total} new records\n"; while ($f = pg_fetch_assoc($r)) { echo "[".date("Y-m-d H:i:s")."] Processing record ${i}/${total} (".sprintf("%0.02f",100.0*$i/$total)."%)\n"; // 2.1. Check if the record has root_id on trip_price_comparison already (if "yes" skip) $root_id = get_root_id($f["data_source_id"],$f["data_source"]); if ($root_id>0) { echo "[".date("Y-m-d H:i:s")."] Record (".$f["data_source_id"].",".$f["data_source"].") already anchored to ${root_id}\n"; $i++; continue; } // 2.2. Check for the closest root_id with the radius search => new root_id if ($min_parsedemail_item_id>0 || $min_quote_id>0) { $root = get_closest_root($f); if (is_array($root)) { echo "[".date("Y-m-d H:i:s")."] Found root record (".$root["data_source_id"].",".$root["data_source"].")\n"; $root_id = $root["data_source_id"]; } } // 2.3. Otherwise current trip record is root_id if ($root_id<1) { echo "[".date("Y-m-d H:i:s")."] Current record is the new root (".$f["data_source_id"].",".$f["data_source"].")\n"; $root = $f; } // 3. Run GPS radius search for the trip selected in 2.2 or 2.3 and left join trip_price_comparison (do not select record if it has a root_id already) $q = "SELECT a.* FROM union_trip_and_quote_view_table a "; $q.= " LEFT JOIN address a1 ON a1.id = a.location_start_id "; $q.= " LEFT JOIN address a2 ON a2.id = a.location_end_id "; $q.= " LEFT JOIN trip_price_comparison b ON (b.data_source_id=a.data_source_id AND b.data_source=a.data_source) "; $q.= " WHERE a.travel_date BETWEEN NOW() - INTERVAL '" . $numberOfDays . " days' AND NOW() "; $q.= " AND b.root_id IS NULL "; // Do we apply box here as well? $q.= " AND ST_DWithin (a1.geometry, ST_MakePoint ('".pg_escape_string($root["location_start_lng"])."', '".pg_escape_string($root["location_start_lat"])."')::geography, '".$radius."') "; $q.= " AND ST_DWithin (a2.geometry, ST_MakePoint ('".pg_escape_string($root["location_end_lng"])."', '".pg_escape_string($root["location_end_lat"])."')::geography, '".$radius."') "; $similars = pg_query($readOnlyReplicaConn, $q); if ($similars && pg_num_rows($similars)) { $i += pg_num_rows($similars); // We assume we will advance that many (skip) in the future $id = add_trip_price_comparison($root,$root); echo "[".date("Y-m-d H:i:s")."] Created new root entry (".$root["data_source_id"].",".$root["data_source"].") id=${id}\n"; echo "[".date("Y-m-d H:i:s")."] Processing ".pg_num_rows($similars)." similar records\n"; while ($similar=pg_fetch_assoc($similars)) { if ($similar["data_source_id"]==$root["data_source_id"] && $similar["data_source"]==$root["data_source"]) { continue; } // 3.1. Insert new records into trip_price_comparison $id = add_trip_price_comparison($similar,$root); echo "[".date("Y-m-d H:i:s")."] Created new trip price comparison record id=${id}\n"; } // 3.2. Average by root_id $average = get_average_by_root_id($root["data_source_id"],$root["data_source"]); echo "[".date("Y-m-d H:i:s")."] Average for (".$root["data_source_id"].",".$root["data_source"].") = ".sprintf("%0.02f",$average)."\n"; // 3.3. Update average by root_id $num = update_average_by_root_id($average,$root["data_source_id"],$root["data_source"]); echo "[".date("Y-m-d H:i:s")."] Updated ${num} anchored records\n"; } else { // Single! $id = add_trip_price_comparison($f,$root); echo "[".date("Y-m-d H:i:s")."] Created singular trip price comparison record id=${id}\n"; } $i++; //if ($i>100) break; // DEBUG; } } update_global_setting('last_compared_parsedemail_item',$max_parsedemail_item_id); update_global_setting('last_compared_quote',$max_quote_id); pg_close($con); echo "[".date("Y-m-d H:i:s")."] update_cheapest_data job complete.\n"; /*********************************** Function ***********************************/ function get_global_setting($key,$val=0) { global $readOnlyReplicaConn; $q = "SELECT value FROM global_settings WHERE lower(key)=lower('".pg_escape_string($key)."')"; $r = pg_query($readOnlyReplicaConn,$q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { return $f[0]; } return $val; } function update_global_setting($key,$val) { global $con; $q = "UPDATE global_settings SET value='".pg_escape_string($val)."' WHERE lower(key)=lower('".pg_escape_string($key)."')"; $r = pg_query($con,$q); if ($r) { return pg_affected_rows($r); } return 0; } function get_max_id($what) { global $readOnlyReplicaConn; $q = "SELECT max(id) FROM ${what}"; $r = pg_query($readOnlyReplicaConn,$q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { return $f[0]; } return 0; } function get_root_id($data_source_id,$data_source) { global $readOnlyReplicaConn; $q = "SELECT root_id FROM trip_price_comparison WHERE data_source_id=${data_source_id} AND data_source=${data_source}"; $r = pg_query($readOnlyReplicaConn,$q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { return $f[0]; } return 0; } function add_trip_price_comparison($data,$root) { global $con; $q = "INSERT INTO trip_price_comparison (data_source_id,data_source,root_id,root_type,cost,average) VALUES("; $q.= $data["data_source_id"].",".$data["data_source"].",".$root["data_source_id"].",".$root["data_source"]; $q.= ",".$data["cost"].",".$root["cost"].") RETURNING id"; $r = pg_query($con, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { return $f[0]; } echo "[".date("Y-m-d H:i:s")."] ".$q."\n"; echo "[".date("Y-m-d H:i:s")."] ".pg_last_error($con)."\n"; return 0; } function get_average_by_root_id($root_id,$root_type) { global $readOnlyReplicaConn; $q = "SELECT ROUND(avg(cost),2) FROM trip_price_comparison WHERE root_id = ${root_id} AND root_type = ${root_type}"; $r = pg_query($readOnlyReplicaConn, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_row($r)) { return $f[0]; } return 0.0; } function update_average_by_root_id($average,$root_id,$root_type) { global $con; $q = "UPDATE trip_price_comparison SET average='".$average."' WHERE root_id = ${root_id} AND root_type = ${root_type}"; $r = pg_query($con, $q); if ($r) { return pg_affected_rows($r); } return 0; } function get_closest_root($data) { global $readOnlyReplicaConn, $radius; $q = "SELECT a.*,b.data_source_id,b.data_source,b.root_id,b.root_type "; $q.= ", a1.latitude AS location_start_lat, a1.longitude AS location_start_lng "; $q.= ", a2.latitude AS location_end_lat, a2.longitude AS location_end_lng "; $q.= " FROM union_trip_and_quote_view_table a "; $q.= " LEFT JOIN address a1 ON a1.id = a.location_start_id "; $q.= " LEFT JOIN address a2 ON a2.id = a.location_end_id "; $q.= " LEFT JOIN trip_price_comparison b ON (b.data_source_id=a.data_source_id AND b.data_source=a.data_source) "; $q.= " WHERE b.root_id IS NOT NULL AND b.root_id = b.data_source_id AND b.root_type = b.data_source "; $q.= " AND ST_DWithin (a1.geometry, ST_MakePoint ('".pg_escape_string($data["location_start_lng"])."', '".pg_escape_string($data["location_start_lat"])."')::geography, '".$radius."') "; $q.= " AND ST_DWithin (a2.geometry, ST_MakePoint ('".pg_escape_string($data["location_end_lng"])."', '".pg_escape_string($data["location_end_lat"])."')::geography, '".$radius."') "; $q.= " ORDER BY ST_DistanceSphere(a1.geometry::geometry, ST_SetSRID(ST_MakePoint('".pg_escape_string($data["location_start_lng"])."', '".pg_escape_string($data["location_start_lat"])."'),4326)), "; $q.= " ST_DistanceSphere(a2.geometry::geometry, ST_SetSRID(ST_MakePoint('".pg_escape_string($data["location_end_lng"])."', '".pg_escape_string($data["location_end_lat"])."'),4326))"; $r = pg_query($readOnlyReplicaConn, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) { return $f; } echo "[".date("Y-m-d H:i:s")."] ".$q."\n"; echo "[".date("Y-m-d H:i:s")."] ".pg_last_error($con)."\n"; return NULL; } unlock_pid_file($lock_file); ?>