using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace MaddoShared { public static class ThreadingHelper { /// /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// NOTE: If one of the given tasks has already been started, an exception will be thrown. /// /// The tasks to run. /// The maximum number of tasks to run in parallel. /// The cancellation token. public static void StartAndWaitAllThrottled(IEnumerable tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) { StartAndWaitAllThrottled(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); } /// /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// NOTE: If one of the given tasks has already been started, an exception will be thrown. /// /// The tasks to run. /// The maximum number of tasks to run in parallel. /// The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely. /// The cancellation token. public static void StartAndWaitAllThrottled(IEnumerable tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) { // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. var tasks = tasksToRun.ToList(); using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) { var postTaskTasks = new List(); // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); // Start running each task. foreach (var task in tasks) { // Increment the number of tasks currently running and wait if too many are running. throttler.Wait(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); task.Start(); } // Wait for all of the provided tasks to complete. // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. Task.WaitAll(postTaskTasks.ToArray(), cancellationToken); } } } }