Imagine I have 100 tasks to run. But my machine has only 16 core. So I expect that every core will run one task in parallel. And once a core is idle, assign a new task to it.

That will be very easy for C# with Task.

But before starting, we gonna create a thread-safe queue for saving our tasks.

    public class SafeQueue<T>
    {
        private readonly Queue<T> queue = new Queue<T>();
        private readonly object loc = new object();

        public void Enqueue(T item)
        {
            lock (loc)
            {
                queue.Enqueue(item);
            }
        }

        public T Dequeue()
        {
            T item = default;
            lock (loc)
            {
                item = queue.Dequeue();
            }
            return item;
        }

        public bool Any()
        {
            bool any = false;
            lock (loc)
            {
                any = queue.Any();
            }
            return any;
        }
    }

When we have the queue, we can have a new independent service will add a new task to the queue.

    public class CannonQueue
    {
        private readonly SafeQueue<Func<Task>> _pendingTaskFactories = new SafeQueue<Func<Task>>();
        private Task _engine = Task.CompletedTask;

        public void QueueNew(Func<Task> taskFactory)
        {
            _pendingTaskFactories.Enqueue(taskFactory);
            _engine = RunTasksInQueue();
        }

        private async Task RunTasksInQueue(int maxDegreeOfParallelism = 8)
        {
            var tasksInFlight = new List<Task>(maxDegreeOfParallelism);
            while (_pendingTaskFactories.Any())
            {
                while (tasksInFlight.Count < maxDegreeOfParallelism && _pendingTaskFactories.Any())
                {
                    Func<Task> taskFactory = _pendingTaskFactories.Dequeue();
                    tasksInFlight.Add(taskFactory());
                }

                Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
                await completedTask.ConfigureAwait(false);
                tasksInFlight.Remove(completedTask);
            }
        }
    }

But that code will only start the engine once there are tasks. What if we need to add more tasks dynamically to the tasks pool?

Consider restarting the engine every time we add a task.

Modify the QueueNew method.

        public void QueueNew(Func<Task> taskFactory)
        {
            _pendingTaskFactories.Enqueue(taskFactory);
            Task.Factory.StartNew(() => 
            {
                lock (loc)
                {
                    if (_engine.IsCompleted)
                    {
                        Console.WriteLine("Engine is sleeping. Trying to wake it up.");
                        _engine = RunTasksInQueue();
                    }
                }
            });
        }

Now it runs well.

In case you may need dependency injection. That your task needs to access some dependencies like database or HTTP. You need to keep your dependency alive.

So we can add a new QueueWithDependency method.

The final code will be:

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Aiursoft.XelNaga.Services
{
    public class CannonQueue
    {
        private readonly SafeQueue<Func<Task>> _pendingTaskFactories = new SafeQueue<Func<Task>>();
        private readonly IServiceScopeFactory _scopeFactory;
        private readonly ILogger<CannonQueue> _logger;
        private readonly object loc = new object();
        private Task _engine = Task.CompletedTask;

        public CannonQueue(
            IServiceScopeFactory serviceScopeFactory,
            ILogger<CannonQueue> logger)
        {
            _scopeFactory = serviceScopeFactory;
            _logger = logger;
        }

        public void QueueNew(Func<Task> taskFactory)
        {
            _pendingTaskFactories.Enqueue(taskFactory);
            Task.Factory.StartNew(() => 
            {
                lock (loc)
                {
                    if (_engine.IsCompleted)
                    {
                        Console.WriteLine("Engine is sleeping. Trying to wake it up.");
                        _engine = RunTasksInQueue();
                    }
                }
            });
        }

        public void QueueWithDependency<T>(Func<T, Task> bullet)
        {
            QueueNew(async () =>
            {
                using var scope = _scopeFactory.CreateScope();
                var dependency = scope.ServiceProvider.GetRequiredService<T>();
                try
                {
                    await bullet(dependency);
                }
                catch (Exception e)
                {
                    _logger.LogError(e, $"An error occurred with Cannon. Dependency: {typeof(T).Name}.");
                }
            });
        }

        private async Task RunTasksInQueue(int maxDegreeOfParallelism = 8)
        {
            var tasksInFlight = new List<Task>(maxDegreeOfParallelism);
            while (_pendingTaskFactories.Any())
            {
                while (tasksInFlight.Count < maxDegreeOfParallelism && _pendingTaskFactories.Any())
                {
                    Func<Task> taskFactory = _pendingTaskFactories.Dequeue();
                    tasksInFlight.Add(taskFactory());
                }

                Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
                await completedTask.ConfigureAwait(false);
                tasksInFlight.Remove(completedTask);
            }
        }
    }
}

Don't forget to register that as a singleton dependency!

services.AddSingleton<CannonQueue>();