import time
import tempfile
import shutil
import os
import threading
def cleanup_artifacts(func, timeout, *args, **kwargs):
"""
Executes a function with a timeout and cleans up temporary files
created during execution.
Args:
func: The function to execute.
timeout: The maximum execution time in seconds.
*args: Positional arguments for the function.
**kwargs: Keyword arguments for the function.
Returns:
The return value of the function if it completes within the timeout,
otherwise None. Also ensures cleanup.
"""
results = {} # Store results from the function
exception = None # Store any exception raised by the function
def execute_with_timeout():
try:
results = func(*args, **kwargs)
except Exception as e:
exception = e
def cleanup():
try:
# Attempt to remove temporary files/directories
for path in os.listdir(tempfile.gettempdir()):
full_path = os.path.join(tempfile.gettempdir(), path)
if os.path.isdir(full_path):
shutil.rmtree(full_path) # Remove directories recursively
else:
os.remove(full_path) # Remove files
except Exception as e:
print(f"Error during cleanup: {e}")
def timed_execution():
start_time = time.time()
try:
execute_with_timeout()
except Exception as e:
exception = e
finally:
elapsed_time = time.time() - start_time
if elapsed_time > timeout:
print(f"Function timed out after {timeout} seconds.")
raise TimeoutError(f"Function execution exceeded timeout ({timeout} seconds)")
thread = threading.Thread(target=timed_execution)
thread.daemon = True # Allow the main thread to exit even if this thread is running
thread.start()
thread.join(timeout) # Wait for the thread to finish (with a timeout)
if exception:
print(f"Function execution failed: {exception}")
return None # Indicate failure
else:
return results # Return the result if successful
Add your comment