addServers($gearmanServers); do { echo "[".date("Y-m-d H:i:s")."] Scheduler to check tracklocation files starting\n"; // Clean old jobs $q = "DELETE FROM tracking_data_job WHERE updated + interval '1 hour' < now()"; pg_query($pgconn, $q); $files = []; $files = preg_grep('/^(tl_(\w|\.)*\.json)$/', scandir($tracklocation_dir)); while(count($files)) { $sortedFiles = []; foreach ($files as $file) { $sortedFiles[$file] = filemtime($tracklocation_dir.$file); } asort($sortedFiles); $files = array_keys($sortedFiles); $file = reset($files); $parts = explode('_', $file); $session = $parts[2]; while(strlen($session) == 0 && count($files)) { array_splice($files,0,1); $file = reset($files); $parts = explode('_', $file); $session = $parts[2]; } if (strlen($session)) { $q = "SELECT * FROM tracking_data_job WHERE session = '{$session}'"; $r = pg_query($pgconn, $q); if ($r && pg_num_rows($r) && $f=pg_fetch_assoc($r)) { if ($f["locked"] === 'false' || $f["locked"] === 'f' || $f["locked"] === false) { $q = "UPDATE tracking_data_job SET locked = TRUE, updated=now() WHERE id=".$f["id"]; } else { $files = preg_grep('/^(tl_(\d)*_((?!'.$session.').)+_(\w|\d|\.)*\.json)$/', scandir($tracklocation_dir)); continue; } } else { $q = "INSERT INTO tracking_data_job (session, locked) VALUES ('{$session}', TRUE)"; } $r = pg_query($pgconn, $q); $session_files = array_filter( $files, function ( $item ) use ( $session ) { return strpos($item, $session) !== FALSE; }); foreach ($session_files as $sessionFile) { rename($tracklocation_dir.$sessionFile, $tracklocation_dir.$sessionFile.'.lock'); } $data = [ 'tl_session' => $session, ]; $client->doBackground('tl_decompress', json_encode($data)); $files = preg_grep('/^(tl_(\d)*_((?!'.$session.').)+_(\w|\d|\.)*\.json)$/', scandir($tracklocation_dir)); } }; echo "[".date("Y-m-d H:i:s")."] Scheduler is going to sleep\n"; sleep(60); // Sleep for 1 minute between cycles } while(true); echo "[".date("Y-m-d H:i:s")."] Abnormal termination!\n";