<?php
/**
* Flags anomalies in message queues for scheduled runs.
* Supports older PHP versions (PHP 5.6+).
*
* @param array $queue_data Array of message queue data. Each element should be an array
* with keys like 'message_id', 'timestamp', 'processing_time', 'status'.
* @param array $thresholds Array of anomaly thresholds. Defaults provided.
* @return array Array of flagged anomalies. Each element is an array with details about the anomaly.
*/
function flagQueueAnomalies(array $queue_data, array $thresholds = []): array
{
// Default thresholds
if (empty($thresholds)) {
$thresholds = [
'processing_time_high' => 5, // seconds
'processing_time_low' => 0.1, // seconds
'message_count_high' => 100, // messages
'message_count_low' => 1, // messages
];
}
$anomalies = [];
if (empty($queue_data)) {
return $anomalies; // Return empty array if no data
}
$previous_processing_times = [];
$previous_message_counts = [];
foreach ($queue_data as $item) {
// Validate data structure (basic check)
if (!is_array($item) || !isset($item['timestamp']) || !isset($item['processing_time']) || !isset($item['status'])) {
continue; // Skip invalid data
}
$timestamp = $item['timestamp'];
$processing_time = $item['processing_time'];
$status = $item['status'];
// Processing Time Anomaly Detection
if ($processing_time > $thresholds['processing_time_high']) {
$anomalies[] = [
'message_id' => $item['message_id'],
'timestamp' => $timestamp,
'anomaly_type' => 'processing_time_high',
'value' => $processing_time,
'threshold' => $thresholds['processing_time_high'],
];
} elseif ($processing_time < $thresholds['processing_time_low']) {
$anomalies[] = [
'message_id' => $item['message_id'],
'timestamp' => $timestamp,
'anomaly_type' => 'processing_time_low',
'value' => $processing_time,
'threshold' => $thresholds['processing_time_low'],
];
}
// Message Count Anomaly Detection
if (count($queue_data) > 1 && (count($queue_data) - $previous_message_counts) > $thresholds['message_count_high']) {
$anomalies[] = [
'message_id' => 'general', // Use general for aggregate anomalies
'timestamp' => $timestamp,
'anomaly_type' => 'message_count_high',
'value' => count($queue_data),
'threshold' => $thresholds['message_count_high'],
];
}
if (count($queue_data) > 1 && (count($queue_data) - $previous_message_counts) < $thresholds['message_count_low']) {
$anomalies[] = [
'message_id' => 'general', // Use general for aggregate anomalies
'timestamp' => $timestamp,
'anomaly_type' => 'message_count_low',
'value' => count($queue_data),
'threshold' => $thresholds['message_count_low'],
];
}
$previous_processing_times[] = $processing_time;
$previous_message_counts[] = count($queue_data);
}
return $anomalies;
}
Add your comment