import asyncio
import aiofiles
import threading
import queue
import time
class FileBuffer:
def __init__(self, queue_size=10):
self.queue = queue.Queue(maxsize=queue_size)
self.lock = threading.Lock()
self.stop_event = threading.Event()
def enqueue(self, filepath):
"""Adds a filepath to the queue."""
with self.lock:
self.queue.put(filepath)
def dequeue(self):
"""Retrieves a filepath from the queue. Blocks if empty."""
return self.queue.get()
def is_empty(self):
"""Checks if the queue is empty."""
return self.queue.empty()
def stop(self):
"""Signals the buffer to stop processing."""
self.stop_event.set()
def get_all(self):
"""Returns a list of all files in the buffer."""
with self.lock:
return list(self.queue.queue)
async def process_file(filepath):
"""Simulates a short-lived synchronous task processing a file."""
print(f"Processing file: {filepath}")
time.sleep(2) # Simulate processing time
print(f"Finished processing: {filepath}")
async def main(file_buffer):
"""Main function to demonstrate file buffering."""
while not file_buffer.is_empty() and not file_buffer.stop_event.is_set():
filepath = file_buffer.dequeue()
asyncio.create_task(process_file(filepath)) # Run processing in a separate task
await asyncio.sleep(0.1) # Allow other tasks to run
if __name__ == "__main__":
buffer = FileBuffer(queue_size=3)
file_paths = ["file1.txt", "file2.txt", "file3.txt", "file4.txt", "file5.txt"]
# Enqueue files
for file_path in file_paths:
buffer.enqueue(file_path)
# Start the processing tasks
asyncio.run(main(buffer))
print("All files processed.")
Add your comment