Skip to content

Commit

Permalink
Changes DataStores back to Scoped (#562)
Browse files Browse the repository at this point in the history
* Changes DataStores back to Scoped - moves lifetime ownership to highest level components
* Adds Func<IScoped<ISearchService>> resolution
* Adds Func<IScoped<IFhirDataStore>>
  • Loading branch information
brendankowitz authored Jun 28, 2019
1 parent c8cfcd0 commit a433fbd
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 136 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using Microsoft.Health.Extensions.DependencyInjection;
using NSubstitute;

namespace Microsoft.Health.Fhir.Core.UnitTests.Extensions
{
public static class CreateMockedScopeExtensions
{
public static IScoped<T> CreateMockScope<T>(this T obj)
{
var scope = Substitute.For<IScoped<T>>();
scope.Value.Returns(obj);
return scope;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Features.SecretStore;
using Microsoft.Health.Fhir.Core.UnitTests.Extensions;
using Microsoft.Health.Fhir.Tests.Common;
using NSubstitute;
using Xunit;
Expand Down Expand Up @@ -69,10 +70,10 @@ public ExportJobTaskTests()
_resourceToByteArraySerializer.Serialize(Arg.Any<ResourceWrapper>()).Returns(x => Encoding.UTF8.GetBytes(x.ArgAt<ResourceWrapper>(0).ResourceId));

_exportJobTask = new ExportJobTask(
_fhirOperationDataStore,
() => _fhirOperationDataStore.CreateMockScope(),
_secretStore,
Options.Create(_exportJobConfiguration),
_searchService,
() => _searchService.CreateMockScope(),
_resourceToByteArraySerializer,
_exportDestinationClientFactory,
NullLogger<ExportJobTask>.Instance);
Expand Down Expand Up @@ -357,10 +358,10 @@ public async Task GivenAnExportJobToResume_WhenExecuted_ThenItShouldExportAllRec
_inMemoryDestinationClient = new InMemoryExportDestinationClient();
_exportDestinationClientFactory.Create("in-memory").Returns(_inMemoryDestinationClient);
var secondExportJobTask = new ExportJobTask(
_fhirOperationDataStore,
() => _fhirOperationDataStore.CreateMockScope(),
_secretStore,
Options.Create(_exportJobConfiguration),
_searchService,
() => _searchService.CreateMockScope(),
_resourceToByteArraySerializer,
_exportDestinationClientFactory,
NullLogger<ExportJobTask>.Instance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Export;
Expand Down Expand Up @@ -42,9 +43,11 @@ public ExportJobWorkerTests()
_exportJobConfiguration.JobPollingFrequency = DefaultJobPollingFrequency;

_exportJobTaskFactory().Returns(_task);
var scopedOperationDataStore = Substitute.For<IScoped<IFhirOperationDataStore>>();
scopedOperationDataStore.Value.Returns(_fhirOperationDataStore);

_exportJobWorker = new ExportJobWorker(
_fhirOperationDataStore,
() => scopedOperationDataStore,
Options.Create(_exportJobConfiguration),
_exportJobTaskFactory,
NullLogger<ExportJobWorker>.Instance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using EnsureThat;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Features.Operations.Export.ExportDestinationClient;
using Microsoft.Health.Fhir.Core.Features.Operations.Export.Models;
Expand All @@ -23,10 +24,10 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations.Export
{
public class ExportJobTask : IExportJobTask
{
private readonly IFhirOperationDataStore _fhirOperationDataStore;
private readonly Func<IScoped<IFhirOperationDataStore>> _fhirOperationDataStoreFactory;
private readonly ISecretStore _secretStore;
private readonly ExportJobConfiguration _exportJobConfiguration;
private readonly ISearchService _searchService;
private readonly Func<IScoped<ISearchService>> _searchServiceFactory;
private readonly IResourceToByteArraySerializer _resourceToByteArraySerializer;
private readonly IExportDestinationClientFactory _exportDestinationClientFactory;
private readonly ILogger _logger;
Expand All @@ -41,26 +42,26 @@ public class ExportJobTask : IExportJobTask
private IExportDestinationClient _exportDestinationClient;

public ExportJobTask(
IFhirOperationDataStore fhirOperationDataStore,
Func<IScoped<IFhirOperationDataStore>> fhirOperationDataStoreFactory,
ISecretStore secretStore,
IOptions<ExportJobConfiguration> exportJobConfiguration,
ISearchService searchService,
Func<IScoped<ISearchService>> searchServiceFactory,
IResourceToByteArraySerializer resourceToByteArraySerializer,
IExportDestinationClientFactory exportDestinationClientFactory,
ILogger<ExportJobTask> logger)
{
EnsureArg.IsNotNull(fhirOperationDataStore, nameof(fhirOperationDataStore));
EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory));
EnsureArg.IsNotNull(secretStore, nameof(secretStore));
EnsureArg.IsNotNull(exportJobConfiguration?.Value, nameof(exportJobConfiguration));
EnsureArg.IsNotNull(searchService, nameof(searchService));
EnsureArg.IsNotNull(searchServiceFactory, nameof(searchServiceFactory));
EnsureArg.IsNotNull(resourceToByteArraySerializer, nameof(resourceToByteArraySerializer));
EnsureArg.IsNotNull(exportDestinationClientFactory, nameof(exportDestinationClientFactory));
EnsureArg.IsNotNull(logger, nameof(logger));

_fhirOperationDataStore = fhirOperationDataStore;
_fhirOperationDataStoreFactory = fhirOperationDataStoreFactory;
_secretStore = secretStore;
_exportJobConfiguration = exportJobConfiguration.Value;
_searchService = searchService;
_searchServiceFactory = searchServiceFactory;
_resourceToByteArraySerializer = resourceToByteArraySerializer;
_exportDestinationClientFactory = exportDestinationClientFactory;
_logger = logger;
Expand Down Expand Up @@ -104,8 +105,14 @@ public async Task ExecuteAsync(ExportJobRecord exportJobRecord, WeakETag weakETa
// 2. There is no continuation token but the page is 0, which means it's the initial export.
while (progress.ContinuationToken != null || progress.Page == 0)
{
SearchResult searchResult;

// Search and process the results.
SearchResult searchResult = await _searchService.SearchAsync(_exportJobRecord.ResourceType, queryParameters, cancellationToken);
using (IScoped<ISearchService> searchService = _searchServiceFactory())
{
searchResult = await searchService.Value.SearchAsync(_exportJobRecord.ResourceType, queryParameters, cancellationToken);
}

await ProcessSearchResults(searchResult.Results, currentBatchId, cancellationToken);

if (searchResult.ContinuationToken == null)
Expand Down Expand Up @@ -179,10 +186,13 @@ private async Task UpdateAndCommitJobStatus(OperationStatus operationStatus, boo

private async Task UpdateAndCommitJobRecord(ExportJobRecord jobRecord, CancellationToken cancellationToken)
{
ExportJobOutcome updatedExportJobOutcome = await _fhirOperationDataStore.UpdateExportJobAsync(jobRecord, _weakETag, cancellationToken);
using (IScoped<IFhirOperationDataStore> fhirOperationDataStore = _fhirOperationDataStoreFactory())
{
ExportJobOutcome updatedExportJobOutcome = await fhirOperationDataStore.Value.UpdateExportJobAsync(jobRecord, _weakETag, cancellationToken);

_exportJobRecord = updatedExportJobOutcome.JobRecord;
_weakETag = updatedExportJobOutcome.ETag;
_exportJobRecord = updatedExportJobOutcome.JobRecord;
_weakETag = updatedExportJobOutcome.ETag;
}
}

// Get destination info from secret store, create appropriate export client and connect to destination.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using EnsureThat;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Features.Operations.Export.Models;

Expand All @@ -21,19 +22,19 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations.Export
/// </summary>
public class ExportJobWorker
{
private readonly IFhirOperationDataStore _fhirOperationDataStore;
private readonly Func<IScoped<IFhirOperationDataStore>> _fhirOperationDataStoreFactory;
private readonly ExportJobConfiguration _exportJobConfiguration;
private readonly Func<IExportJobTask> _exportJobTaskFactory;
private readonly ILogger _logger;

public ExportJobWorker(IFhirOperationDataStore fhirOperationDataStore, IOptions<ExportJobConfiguration> exportJobConfiguration, Func<IExportJobTask> exportJobTaskFactory, ILogger<ExportJobWorker> logger)
public ExportJobWorker(Func<IScoped<IFhirOperationDataStore>> fhirOperationDataStoreFactory, IOptions<ExportJobConfiguration> exportJobConfiguration, Func<IExportJobTask> exportJobTaskFactory, ILogger<ExportJobWorker> logger)
{
EnsureArg.IsNotNull(fhirOperationDataStore, nameof(fhirOperationDataStore));
EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory));
EnsureArg.IsNotNull(exportJobConfiguration?.Value, nameof(exportJobConfiguration));
EnsureArg.IsNotNull(exportJobTaskFactory, nameof(exportJobTaskFactory));
EnsureArg.IsNotNull(logger, nameof(logger));

_fhirOperationDataStore = fhirOperationDataStore;
_fhirOperationDataStoreFactory = fhirOperationDataStoreFactory;
_exportJobConfiguration = exportJobConfiguration.Value;
_exportJobTaskFactory = exportJobTaskFactory;
_logger = logger;
Expand All @@ -53,12 +54,15 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
// Get list of available jobs.
if (runningTasks.Count < _exportJobConfiguration.MaximumNumberOfConcurrentJobsAllowed)
{
IReadOnlyCollection<ExportJobOutcome> jobs = await _fhirOperationDataStore.AcquireExportJobsAsync(
_exportJobConfiguration.MaximumNumberOfConcurrentJobsAllowed,
_exportJobConfiguration.JobHeartbeatTimeoutThreshold,
cancellationToken);
using (IScoped<IFhirOperationDataStore> store = _fhirOperationDataStoreFactory())
{
IReadOnlyCollection<ExportJobOutcome> jobs = await store.Value.AcquireExportJobsAsync(
_exportJobConfiguration.MaximumNumberOfConcurrentJobsAllowed,
_exportJobConfiguration.JobHeartbeatTimeoutThreshold,
cancellationToken);

runningTasks.AddRange(jobs.Select(job => _exportJobTaskFactory().ExecuteAsync(job.JobRecord, job.ETag, cancellationToken)));
runningTasks.AddRange(jobs.Select(job => _exportJobTaskFactory().ExecuteAsync(job.JobRecord, job.ETag, cancellationToken)));
}
}

await Task.Delay(_exportJobConfiguration.JobPollingFrequency, cancellationToken);
Expand Down
Loading

0 comments on commit a433fbd

Please sign in to comment.