123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- using System;
- using System.Collections.Generic;
- using System.Threading;
- using System.Threading.Tasks;
- namespace WebDAVSharp.Server.Utilities
- {
- /// <summary>
- /// Provides a task scheduler that ensures a maximum concurrency level while running on top of the thread pool.
- /// </summary>
- public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
- {
- // Indicates whether the current thread is processing work items.
- [ThreadStatic] private static bool _currentThreadIsProcessingItems;
- // The list of tasks to be executed
- // The maximum concurrency level allowed by this scheduler.
- private readonly int _maxDegreeOfParallelism;
- private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
- // Indicates whether the scheduler is currently processing work items.
- private int _delegatesQueuedOrRunning;
- /// <summary>
- /// Creates a new instance with the specified degree of parallelism.
- /// </summary>
- /// <param name="maxDegreeOfParallelism"></param>
- public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
- {
- if (maxDegreeOfParallelism < 1)
- throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
- _maxDegreeOfParallelism = maxDegreeOfParallelism;
- }
- /// <summary>
- /// Gets the maximum concurrency level supported by this scheduler.
- /// </summary>
- public override sealed int MaximumConcurrencyLevel => _maxDegreeOfParallelism;
- /// <summary>
- /// Queues a task to the scheduler.
- /// </summary>
- /// <param name="task"></param>
- protected override sealed void QueueTask(Task task)
- {
- // Add the task to the list of tasks to be processed. If there aren't enough
- // delegates currently queued or running to process tasks, schedule another.
- lock (_tasks)
- {
- _tasks.AddLast(task);
- if (_delegatesQueuedOrRunning >= _maxDegreeOfParallelism)
- return;
- ++_delegatesQueuedOrRunning;
- NotifyThreadPoolOfPendingWork();
- }
- }
- // Inform the ThreadPool that there's work to be executed for this scheduler.
- private void NotifyThreadPoolOfPendingWork()
- {
- ThreadPool.UnsafeQueueUserWorkItem(_ =>
- {
- // Note that the current thread is now processing work items.
- // This is necessary to enable inlining of tasks into this thread.
- _currentThreadIsProcessingItems = true;
- try
- {
- // Process all available items in the queue.
- while (true)
- {
- Task item;
- lock (_tasks)
- {
- // When there are no more items to be processed,
- // note that we're done processing, and get out.
- if (_tasks.Count == 0)
- {
- --_delegatesQueuedOrRunning;
- break;
- }
- // Get the next item from the queue
- item = _tasks.First.Value;
- _tasks.RemoveFirst();
- }
- // Execute the task we pulled out of the queue
- TryExecuteTask(item);
- }
- }
- // We're done processing items on the current thread
- finally
- {
- _currentThreadIsProcessingItems = false;
- }
- }, null);
- }
- /// <summary>
- /// Attempts to execute the specified task on the current thread.
- /// </summary>
- /// <param name="task"></param>
- /// <param name="taskWasPreviouslyQueued"></param>
- /// <returns></returns>
- protected override sealed bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
- {
- // If this thread isn't already processing a task, we don't support inlining
- if (!_currentThreadIsProcessingItems)
- return false;
- // If the task was previously queued, remove it from the queue
- if (taskWasPreviouslyQueued)
- // Try to run the task.
- return TryDequeue(task) && TryExecuteTask(task);
- return TryExecuteTask(task);
- }
- /// <summary>
- /// Attempt to remove a previously scheduled task from the scheduler.
- /// </summary>
- /// <param name="task"></param>
- /// <returns></returns>
- protected override sealed bool TryDequeue(Task task)
- {
- lock (_tasks)
- return _tasks.Remove(task);
- }
- /// <summary>
- /// Gets an enumerable of the tasks currently scheduled on this scheduler.
- /// </summary>
- /// <returns></returns>
- protected override sealed IEnumerable<Task> GetScheduledTasks()
- {
- bool lockTaken = false;
- try
- {
- Monitor.TryEnter(_tasks, ref lockTaken);
- if (lockTaken)
- return _tasks;
- else
- throw new NotSupportedException();
- }
- finally
- {
- if (lockTaken)
- Monitor.Exit(_tasks);
- }
- }
- }
- }
|