Skip to content

Commit

Permalink
background processing
Browse files Browse the repository at this point in the history
  • Loading branch information
yodobrin committed Dec 21, 2022
1 parent 455859a commit a342e64
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 14 deletions.
45 changes: 37 additions & 8 deletions aca/Controllers/BlobController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ public class BlobController : ControllerBase
private readonly ILogger<BlobController> _logger;
private IConfiguration _configuration;

private IBackgroundTaskQueue _queue;

const string BLOB = "b";
const string CONTAINER = "c";
const string SAMPLE = "s";

const string TEMP_LOC = "sample";

public BlobController(ILogger<BlobController> logger, IConfiguration configuration)
public BlobController(ILogger<BlobController> logger, IConfiguration configuration, IBackgroundTaskQueue queue)
{
_logger = logger;
_configuration = configuration;
_queue = queue;
}


Expand Down Expand Up @@ -52,12 +55,25 @@ public async Task<ActionResult<string>> CopyBlob(BlobRequest item)
// copy single file
}else if(CONTAINER.Equals(item.RequestType)){
_logger.LogInformation($"BlobController::CopyBlob::Copy entire container content.");
// creating a background task
var workItem = new Func<CancellationToken, ValueTask>(async token =>
{
_logger.LogInformation(
$"Starting work item {item.RequestType} at: {DateTimeOffset.Now}");
// do the copy here
await CopyContainer(sourceBlobClient,targetBlobClient,5000,sas);
_logger.LogInformation($"Work item {item.RequestType} completed at: {DateTimeOffset.Now}");

});
await _queue.QueueBackgroundWorkItemAsync(workItem);

return Accepted($"Copy container task Created: {item.SourceContainer} to {item.TargetContainer}");
// copy entire container
await CopyContainer(sourceBlobClient,targetBlobClient,5000,sas);
return $"Copied container {item.SourceContainer} to {item.TargetContainer}";
// await CopyContainer(sourceBlobClient,targetBlobClient,5000,sas);
// return $"Copied container {item.SourceContainer} to {item.TargetContainer}";
}else if(SAMPLE.Equals(item.RequestType) && !string.IsNullOrEmpty(item.BlobName)){
_logger.LogInformation($"BlobController::CopyBlob::Creating samples.");
long btime = DateTime.Now.Ticks;
// long btime = DateTime.Now.Ticks;
// download the file to a temporary location (sample container)
BlobContainerClient localBlobClient = new BlobContainerClient(sourceCS,TEMP_LOC);
localBlobClient.CreateIfNotExists();
Expand All @@ -66,10 +82,23 @@ public async Task<ActionResult<string>> CopyBlob(BlobRequest item)
BlobClient localBlob = localBlobClient.GetBlobClient(localFileTemp);
Uri uri = new Uri(item.BlobName);
localBlob.StartCopyFromUri(uri);
_logger.LogInformation($"BlobController::CopyBlob::Creating samples::local copy completed - copy to designated container started");
await CreateSample(localBlob,targetBlobClient,item.SampleSize);
long etime = DateTime.Now.Ticks;
return $"Created {item.SampleSize} samples in {item.TargetContainer}, took {(etime-btime)/TimeSpan.TicksPerSecond} seconds ";
_logger.LogInformation($"BlobController::CopyBlob::Creating samples::local copy completed - copy to designated container task starting.");
// creating a background task
var workItem = new Func<CancellationToken, ValueTask>(async token =>
{
_logger.LogInformation(
$"Starting work item {item.RequestType} at: {DateTimeOffset.Now}");
// do the copy here
await CreateSample(localBlob,targetBlobClient,item.SampleSize);
_logger.LogInformation($"Work item {item.RequestType} completed at: {DateTimeOffset.Now}");

});
await _queue.QueueBackgroundWorkItemAsync(workItem);

return Accepted($"Sample task Created: creating {item.SampleSize} samples in {item.TargetContainer}");

// long etime = DateTime.Now.Ticks;
// return $"Created {item.SampleSize} samples in {item.TargetContainer}, took {(etime-btime)/TimeSpan.TicksPerSecond} seconds ";
}else{

// wrong type passed
Expand Down
15 changes: 11 additions & 4 deletions aca/Program.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
using Azure.Identity;
// using Microsoft.AspNetCore.HttpOverrides;

// using Microsoft.OpenApi.Models;



var builder = WebApplication.CreateBuilder(args);

// // when running localy and your user is granted right rbac on the keyvault, u can use this:
// when running localy and your user is granted right rbac on the keyvault, u can use this:
// builder.Configuration.AddAzureKeyVault(
// new Uri($"https://{builder.Configuration["keyvault"]}.vault.azure.net/"),
// new Uri($"https://ext-lake-kv.vault.azure.net/"),
// new DefaultAzureCredential());

builder.Configuration.AddAzureKeyVault(
Expand All @@ -21,6 +20,14 @@

// Add services to the container.


builder.Services.AddSingleton<IBackgroundTaskQueue>(ctx =>
{
return new BackgroundTaskQueue(100);
});

builder.Services.AddHostedService<BackgroundWorker>();

builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
Expand Down
52 changes: 52 additions & 0 deletions aca/Workers/BackgroundWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@

public class BackgroundWorker : BackgroundService
{
private readonly ILogger<BackgroundWorker> _logger;

public BackgroundWorker(IBackgroundTaskQueue taskQueue,
ILogger<BackgroundWorker> logger)
{
TaskQueue = taskQueue;
_logger = logger;
}

public IBackgroundTaskQueue TaskQueue { get; }

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Queued Hosted Service running.");
await BackgroundProcessing(stoppingToken);
}

private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{


var workItem =
await TaskQueue.DequeueAsync(stoppingToken);
_logger.LogInformation("Accepted task at: {time}", DateTimeOffset.Now);

try
{
await workItem(stoppingToken);
_logger.LogInformation("Completed task at: {time}", DateTimeOffset.Now);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}

public override async Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Queued Hosted Service is stopping.");

await base.StopAsync(stoppingToken);
}
}


48 changes: 48 additions & 0 deletions aca/Workers/IbackgroundTaskQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System.Threading.Channels;

public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);

ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

public BackgroundTaskQueue(int capacity)
{
// Capacity should be set based on the expected application load and
// number of concurrent threads accessing the queue.
// BoundedChannelFullMode.Wait will cause calls to WriteAsync() to return a task,
// which completes only when space became available. This leads to backpressure,
// in case too many publishers/calls start accumulating.
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}

public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}

await _queue.Writer.WriteAsync(workItem);
}

public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);

return workItem;
}
}

2 changes: 1 addition & 1 deletion aca/aca.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>

<ItemGroup>
<ItemGroup>
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.2.3" />
<PackageReference Include="Azure.Extensions.AspNetCore.Configuration.Secrets" Version="1.2.2" />
<PackageReference Include="Azure.Identity" Version="1.6.1" />
Expand Down
32 changes: 31 additions & 1 deletion aca/test.rest
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ Content-Type: application/json
"requestType": "c"
}

### test background
POST https://localhost:7179/api/Blob
accept: text/plain
Content-Type: application/json

{
"sourceCS": "sourcecs",
"targetCS": "targetcs",
"blobName": "test2/auth4.gif",
"sourceContainer": "1000files",
"targetContainer": "test1000",
"requestType": "c"
}

###
# single blob copy
POST https://localhost:7179/api/Blob
Expand Down Expand Up @@ -40,7 +54,7 @@ Content-Type: application/json
"sourceContainer": "testdir",
"targetContainer": "5stream",
"requestType": "s",
"sampleSize": 5000
"sampleSize": 500
}


Expand All @@ -59,3 +73,19 @@ Content-Type: application/json
"requestType": "s",
"SampleSize": 5
}

### test aca

POST https://second--80qa8q9.jollyrock-6da7e977.northeurope.azurecontainerapps.io/api/Blob
accept: text/plain
Content-Type: application/json

{
"sourceCS": "sourcecs",
"targetCS": "sourcecs",
"blobName": "https://raw.githubusercontent.com/soferreira/copy-alternatives/main/sample_data/daily.data",
"sourceContainer": "testdir",
"targetContainer": "batchingtest",
"requestType": "s",
"sampleSize": 200
}

0 comments on commit a342e64

Please sign in to comment.