import json
import time
import re
class ArtifactCleaner:
def __init__(self, cleanup_interval=60):
"""
Initializes the ArtifactCleaner.
Args:
cleanup_interval (int): Time in seconds to run cleanup. Defaults to 60 seconds.
"""
self.cleanup_interval = cleanup_interval
self.artifacts = [] # Store artifacts (e.g., response bodies)
self.last_cleanup = 0
def store_artifact(self, artifact):
"""
Stores an artifact.
Args:
artifact (any): The artifact to store.
"""
self.artifacts.append(artifact)
def cleanup(self):
"""
Cleans up artifacts older than the specified interval.
"""
now = time.time()
artifacts_to_remove = []
for artifact in self.artifacts:
if (now - artifact['timestamp']) > self.cleanup_interval:
artifacts_to_remove.append(artifact)
for artifact in artifacts_to_remove:
self.artifacts.remove(artifact)
print(f"Cleaned up {len(artifacts_to_remove)} artifacts.")
def run(self):
"""
Runs the cleanup process periodically.
"""
while True:
self.cleanup()
self.last_cleanup = time.time()
time.sleep(self.cleanup_interval)
def clean_response(response_text):
"""
Cleans a response text by removing sensitive information.
Args:
response_text (str): The raw response text.
Returns:
str: The cleaned response text.
"""
# Remove potential API keys or sensitive data using regex
response_text = re.sub(r'\b(API_KEY|password|secret)\b', '[REDACTED]', response_text, flags=re.IGNORECASE)
# Remove JSON formatting (optional, for brevity)
response_text = re.sub(r'\{.*\}', '', response_text)
response_text = re.sub(r'\[.*\]', '', response_text)
return response_text
if __name__ == '__main__':
cleaner = ArtifactCleaner(cleanup_interval=10) # Cleanup every 10 seconds
# Simulate API calls and artifact storage
for i in range(15):
# Simulate an API response
response_data = {"timestamp": time.time(), "data": f"This is some data {i}"}
response_text = json.dumps(response_data)
cleaned_response = clean_response(response_text)
cleaner.store_artifact(response_text)
time.sleep(2) # Simulate API call frequency
# Start the cleanup process in the background
cleanup_thread = threading.Thread(target=cleaner.run)
cleanup_thread.daemon = True # Allow main thread to exit even if cleanup is running
cleanup_thread.start()
time.sleep(15) #Let the cleanup thread run for a bit
print("Done.")
Add your comment