[.NET Core] Task Queue
1. TaskQueue 인터페이스 생성
public interface ITaskQueue
{
void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken);
}
2. TaskQueue 구현체 생성
public class TaskQueue : ITaskQueue
{
private readonly ConcurrentQueue<Func<CancellationToken, Task>> workItems =
new ConcurrentQueue<Func<CancellationToken, Task>>();
private readonly SemaphoreSlim signal = new SemaphoreSlim(0);
public void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
workItems.Enqueue(workItem);
signal.Release();
}
public async Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken)
{
await signal.WaitAsync(cancellationToken);
workItems.TryDequeue(out var workItem);
return workItem;
}
}
3. 백그라운드에서 TaskQueue 작업을 처리하는 서비스 구현
public class TaskQueueWorker : BackgroundService
{
private readonly ILogger logger;
public IBackgroundTaskQueue TaskQueue { get; }
public TaskQueueWorker(IServiceScopeFactory serviceScopeFactory,
ILogger<TaskQueueWorker> logger)
{
using var scope = serviceScopeFactory.CreateScope();
TaskQueue = scope.ServiceProvider.GetRequiredService<IBackgroundTaskQueue>();
this.logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("Task Work Start");
while (!stoppingToken.IsCancellationRequested)
{
var workItem = await TaskQueue.DequeueAsync(stoppingToken);
try
{
var task = workItem.Invoke(stoppingToken);
task.Start();
}
catch (Exception ex)
{
logger.LogError(ex, $"Error occured executing { nameof(workItem) }");
}
}
}
}
4. 서비스 등록
...
services.AddSingleton<ITaskQueue, TaskQueue>();
services.AddHostedService<TaskQueueWorker>();
...