1. <?php
  2. /**
  3. * Flags anomalies in message queues for scheduled runs.
  4. * Supports older PHP versions (PHP 5.6+).
  5. *
  6. * @param array $queue_data Array of message queue data. Each element should be an array
  7. * with keys like 'message_id', 'timestamp', 'processing_time', 'status'.
  8. * @param array $thresholds Array of anomaly thresholds. Defaults provided.
  9. * @return array Array of flagged anomalies. Each element is an array with details about the anomaly.
  10. */
  11. function flagQueueAnomalies(array $queue_data, array $thresholds = []): array
  12. {
  13. // Default thresholds
  14. if (empty($thresholds)) {
  15. $thresholds = [
  16. 'processing_time_high' => 5, // seconds
  17. 'processing_time_low' => 0.1, // seconds
  18. 'message_count_high' => 100, // messages
  19. 'message_count_low' => 1, // messages
  20. ];
  21. }
  22. $anomalies = [];
  23. if (empty($queue_data)) {
  24. return $anomalies; // Return empty array if no data
  25. }
  26. $previous_processing_times = [];
  27. $previous_message_counts = [];
  28. foreach ($queue_data as $item) {
  29. // Validate data structure (basic check)
  30. if (!is_array($item) || !isset($item['timestamp']) || !isset($item['processing_time']) || !isset($item['status'])) {
  31. continue; // Skip invalid data
  32. }
  33. $timestamp = $item['timestamp'];
  34. $processing_time = $item['processing_time'];
  35. $status = $item['status'];
  36. // Processing Time Anomaly Detection
  37. if ($processing_time > $thresholds['processing_time_high']) {
  38. $anomalies[] = [
  39. 'message_id' => $item['message_id'],
  40. 'timestamp' => $timestamp,
  41. 'anomaly_type' => 'processing_time_high',
  42. 'value' => $processing_time,
  43. 'threshold' => $thresholds['processing_time_high'],
  44. ];
  45. } elseif ($processing_time < $thresholds['processing_time_low']) {
  46. $anomalies[] = [
  47. 'message_id' => $item['message_id'],
  48. 'timestamp' => $timestamp,
  49. 'anomaly_type' => 'processing_time_low',
  50. 'value' => $processing_time,
  51. 'threshold' => $thresholds['processing_time_low'],
  52. ];
  53. }
  54. // Message Count Anomaly Detection
  55. if (count($queue_data) > 1 && (count($queue_data) - $previous_message_counts) > $thresholds['message_count_high']) {
  56. $anomalies[] = [
  57. 'message_id' => 'general', // Use general for aggregate anomalies
  58. 'timestamp' => $timestamp,
  59. 'anomaly_type' => 'message_count_high',
  60. 'value' => count($queue_data),
  61. 'threshold' => $thresholds['message_count_high'],
  62. ];
  63. }
  64. if (count($queue_data) > 1 && (count($queue_data) - $previous_message_counts) < $thresholds['message_count_low']) {
  65. $anomalies[] = [
  66. 'message_id' => 'general', // Use general for aggregate anomalies
  67. 'timestamp' => $timestamp,
  68. 'anomaly_type' => 'message_count_low',
  69. 'value' => count($queue_data),
  70. 'threshold' => $thresholds['message_count_low'],
  71. ];
  72. }
  73. $previous_processing_times[] = $processing_time;
  74. $previous_message_counts[] = count($queue_data);
  75. }
  76. return $anomalies;
  77. }

Add your comment