Background Jobs with Rx.NET

As a backend developer you have to implement some kind of periodically running background jobs from time to time. Most widely used tools I see are timers, tasks, semaphores, half dozens of booleans and enums to keep the state. Depending on the complexity of the code, understanding it can be quite adventurous. Alas, almost none of the developers I asked about reactive extensions (Rx.NET) know that they even exist.

In this article:

pg
Pawel Gerr is architect consultant at Thinktecture. He focuses on backends with .NET Core and knows Entity Framework inside out.

If I start talking about reactive extensions (Rx.NET) then usually people either don’t know what it is, or they start thinking about complex systems like processing real time data. Although, sometimes reading an implementation of a “simple background job” using timers and semaphores is more challenging than anything else.

One of the main reasons for not using reactive extension is the lack of practical examples for fairly simple use cases – let’s try to change that.

Use Case: Data Import from FTP

Imagine we have a bunch of suppliers, and we need some data from them, like latest price lists. For this use case we need an importer which downloads the files from FTP, parses them and writes the data into a database.

The requirements are:

  • Perform the import periodically (i.e., every X minutes/hours/days)
  • Perform the import on startup of the application
  • Allow starting the import on-demand (e.g., if someone presses a button or a specific web request comes in)
    • Bonus: Don’t perform the import multiple times if on-demand trigger fires multiple times (e.g., if someone presses the button 5 times)
  • The imports must not overlap (i.e., don’t start a new run before the current run finishes)
  • Don’t let pending imports stack up. For example, when periodic trigger and on-demand trigger fire more often than the actual import can handle in due time.
  • Download the files from FTP in parallel because it is the slowest I/O operation, and each file comes from different FTP server.
    • Bonus: Make degree of parallelism configurable, although, in this case we usually don’t need to restrain the parallelization. Unless the files are kept in memory instead of writing them into a local temp folder.
  • Parse the files as soon as possible after download and in parallel. The degree of parallelism must be configurable because parsing requires more RAM and we want to keep the memory usage increase within acceptable range.
  • Write data to database all at once or in batches. Don’t write data of each file separately because databases prefer a few bigger writes over hundreds of smaller requests.

Implementation of the Importer

In this article we will concentrate on reactive extensions only, i.e., the download from FTP, parsing files and writing data to database will be simulated by Task.Delay() and a few log messages.

Basic Structure

The basic structure of the background job is completely Rx.NET-free and contains typical methods like Start() and Stop(). Furthermore, the class has the method EnqueueImport() which is the on-demand trigger I talked about in previous section. The constructor expects the periodic import interval (importInterval), maximum concurrency of FTP downloads (maxConcurrentDownloads) and parsing files (maxConcurrentParsings).

				
					public class ImporterBackgroundJob : IDisposable
{
   public ImporterBackgroundJob(
      TimeSpan importInterval,
      int maxConcurrentDownloads,
      int maxConcurrentParsings)
   {
      // TODO: create Rx.NET pipeline
   }

   public void Start()
   {
      // TODO: starts the background job
   }

   public void Stop()
   {
      // TODO: stops the background job
   }

   public void EnqueueImport()
   {
      // TODO: starts the import as soon as possible
   }

   public void Dispose()
   {
      // stop/dispose the Rx.NET pipeline
   }
}

				
			

The actual work, like downloading a file, will be delegated to the class Importer. The Importer is immutable and thread-safe. The constructor gets an ImportId for logging purposes. With this id we can conclude the trigger of the current run.

  • Positive numbers (0, 1, 2, etc.): triggered by periodic trigger
  • -1: triggered on startup
  • Negative numbers starting with -2 (-3, -4, etc.): triggered by on-demand trigger
				
					public class Importer
{
   public long ImportId { get; }

   public Importer(long importId)
   {
      ImportId = importId;
   }

   // Fetches FTP url, path and the credentials
   public List<FtpInfo> FetchSupplierFtpInfos()
   {
      Console.WriteLine($"[{ImportId}] Fetching FTP infos.");

      return Enumerable.Range(1, 7)
                       .Select(id => new FtpInfo(id))
                       .ToList();
   }

   public async Task<FtpFile> DownloadFileAsync(FtpInfo ftpInfo, CancellationToken cancellationToken)
   {
      Console.WriteLine($"[{ImportId}] Downloading file from supplier '{ftpInfo.SupplierId}'");
      await Task.Delay(1500, cancellationToken);
      Console.WriteLine($"[{ImportId}]  Downloaded file from supplier '{ftpInfo.SupplierId}'.");

      return new FtpFile(ftpInfo.SupplierId);
   }

   public async Task<Data> ParseAsync(FtpFile file, CancellationToken cancellationToken)
   {
      Console.WriteLine($"[{ImportId}] Parsing file of supplier '{file.SupplierId}'.");
      await Task.Delay(1000, cancellationToken);
      Console.WriteLine($"[{ImportId}]  Parsed file of supplier '{file.SupplierId}'.");

      return new Data(file.SupplierId);
   }

   public async Task SaveToDatabaseAsync(IList<Data> data, CancellationToken cancellationToken)
   {
      Console.WriteLine($"[{ImportId}] Saving data to database. Suppliers: {String.Join(", ", data.Select(d => d.SupplierId))}.");
      await Task.Delay(200, cancellationToken);
      Console.WriteLine($"[{ImportId}]  Saved data to database. Suppliers: {String.Join(", ", data.Select(d => d.SupplierId))}.");
   }
}

				
			

Console Application

The console application has a listener on keypress event to be able start ('s'), stop ('c') the background job, and to trigger an import ('e'). The importer will run every minute and download 5 files in parallel. The maximum degree of parallelism for parsing is 3.

				
					var importerJob = new ImporterBackgroundJob(TimeSpan.FromSeconds(60),
                                            maxConcurrentDownloads: 5,
                                            maxConcurrentParsings: 3);
SetupKeyPressListener(importerJob);

importerJob.Start();

// prevents immediate shutdown of the console application
await Task.Delay(TimeSpan.FromMinutes(5));

static void SetupKeyPressListener(ImporterBackgroundJob importer)
{
   Task.Run(() =>
            {
               while (true)
               {
                  var key = Console.ReadKey();

                  switch (key.KeyChar)
                  {
                     // enqueue
                     case 'e':
                        importer.EnqueueImport();
                        break;

                     // start
                     case 's':
                        importer.Start();
                        break;

                     // stop/exit
                     case 'c':
                        // exit on ctrl+c
                        if (key.Modifiers == ConsoleModifiers.Control)
                           Environment.Exit(0);

                        importer.Stop();
                        break;
                  }
               }
            });
}

				
			

Here is the csproj-file, for the sake of completion.

				
					<Project Sdk="Microsoft.NET.Sdk">

   <PropertyGroup>
      <OutputType>Exe</OutputType>
      <TargetFramework>net7.0</TargetFramework>
      <ImplicitUsings>enable</ImplicitUsings>
      <Nullable>enable</Nullable>
   </PropertyGroup>

   <ItemGroup>
     <PackageReference Include="System.Reactive" Version="6.0.0" />
   </ItemGroup>

</Project>

				
			

Rx.NET Pipeline

Up until now, the code was plain C# doing basic stuff. The implementation of a pipeline will be done in 2 parts. First, we create a pipeline for 3 triggers (periodic, on startup, on-demand) and then a pipeline for the actual import from FTP.

Pipeline: Triggers

The setup of the pipeline is in the constructor because it is not executed until there is a subscriber, similar to a LINQ query without methods like ToList or FirstOrDefault. But these few lines can be moved to Start() as well.

First, we create the _onDemandTrigger which works as the mediator between the method EnqueueImport() and the pipeline. On each call of EnqueueImport() we push a new import-id (for logging purposes) into _onDemandTrigger that passes the value to the pipeline.

The actual _pipeline starts with the periodic trigger Observable.Interval(). The Interval fires an event according to importInverval – in our case every 60 seconds.

The startup trigger is implemented via StartsWith() that fires when the pipeline is activated by subscribing to it. The subscription is made in the method Start().

Please note: If the background job is started and stopped multiple times, then the initial import will be executed on each call of the method Start() and not just once. Use a flag (e.g., bool) or BehaviorSubject to prevent multiple executions.

Last but not least, we have to wire up the on-demand trigger with Merge(). As a bonus, we use Throttle to prevent pushing multiple values down the pipeline if, for example, the user presses the button 5 times in quick succession (within 300ms).

				
					public class ImporterBackgroundJob : IDisposable
{
   private readonly int _maxConcurrentDownloads;
   private readonly int _maxConcurrentParsings;

   private readonly Subject<long> _onDemandTrigger;
   private readonly IObservable<Unit> _pipeline;

   private long _onDemandCounter = -1; // for logging purposes
   private IDisposable? _runningPipeline;

   public ImporterBackgroundJob(
      TimeSpan importInterval,
      int maxConcurrentDownloads,
      int maxConcurrentParsings)
   {
      _maxConcurrentDownloads = maxConcurrentDownloads;
      _maxConcurrentParsings = maxConcurrentParsings;

      _onDemandTrigger = new Subject<long>();

      _pipeline = Observable.Interval(importInterval) // periodic trigger
                            .StartWith(-1)  // triggers immediately when calling "Subscribe"
                            .Merge(_onDemandTrigger   // triggers when calling "EnqueueImport"
                                      // trigger once if we get multiple triggers within 300ms
                                      .Throttle(TimeSpan.FromMilliseconds(300)))
                            .SelectThrottle(i => ImportDataFromFtp(i))
                            .Finally(() => Console.WriteLine("Pipeline terminated"));
   }

   public void Start()
   {
      // starts the background job
      _runningPipeline ??= _pipeline.Subscribe();
   }

   public void Stop()
   {
      // stops the background job
      _runningPipeline?.Dispose();
      _runningPipeline = null;
   }

   public void EnqueueImport()
   {
      // starts the import as soon as possible
      _onDemandTrigger.OnNext(Interlocked.Decrement(ref _onDemandCounter));
   }

   private IObservable<Unit> ImportDataFromFtp(long importId)
   {
      // TODO: pipeline for the actual import
   }

   public void Dispose()
   {
      _onDemandTrigger.Dispose();
      _runningPipeline?.Dispose();
   }
}

				
			

The last step is to call ImportDataFromFtp(). At this point we have several ways to prevent overlapping of multiple imports.

  • Select() + Switch(): Cancels current run and starts a new one. Usually not a requirement.
  • Select() + Concat(): Executes the runs sequentially but pending imports will stack up, if the trigger fire faster than the import can handle them.
  • SelectThrottle(): Custom operator, that discards events from the triggers, if the the import is running already. Alternatively, keeps X events in the pipeline and discards the rest.

I highly recommend writing own operators (extension methods) for reusability.

Besides SelectThrottle I created a few convenience methods for asynchronous methods used in the next section.

				
					public static class ObservableExtensions
{
   public static IObservable<IObservable<TResult>> SelectAsync<TSource, TResult>(
      this IObservable<TSource> source,
      Func<TSource, CancellationToken, Task<TResult>> selector)
   {
      return source.Select(value => Observable.FromAsync(token => selector(value, token)));
   }

   public static IObservable<Unit> SelectManyAsync<TSource>(
      this IObservable<TSource> source,
      Func<TSource, CancellationToken, Task> selector)
   {
      return source.SelectMany(value => Observable.FromAsync(token => selector(value, token)));
   }

   public static IObservable<TResult> SelectThrottle<TSource, TResult>(
      this IObservable<TSource> source,
      Func<TSource, IObservable<TResult>> selector,
      int maxConcurrency = 1,
      bool appendAnotherRunIfEmittedDuringExecution = true)
   {
      if (source == null)
         throw new ArgumentNullException(nameof(source));
      if (selector == null)
         throw new ArgumentNullException(nameof(selector));
      if (maxConcurrency < 1)
         throw new ArgumentOutOfRangeException(nameof(maxConcurrency),
                                               "The concurrency cannot be less than 1.");

      return Observable.Defer(() =>
      {
         var innerConcurrency = appendAnotherRunIfEmittedDuringExecution
                                       && maxConcurrency < Int32.MaxValue
                                   ? maxConcurrency + 1
                                   : maxConcurrency;
         var concurrentCalls = 0;

         return source.Select(v =>
         {
            if (Interlocked.Increment(ref concurrentCalls) <= innerConcurrency)
               return selector(v).Do(DoNoting, () => Interlocked.Decrement(ref concurrentCalls));

            Interlocked.Decrement(ref concurrentCalls);

            return Observable.Empty<TResult>();
         });
      })
     .Merge(maxConcurrency);
   }

   private static void DoNoting<T>(T item)
   {
   }
}

				
			

Pipeline: FTP Import

The pipeline for the actual FTP import can be implemented in multiple ways. We can use Rx.NET all the way or switch to a different tool if it is more appropriate for the task at hand.

Rx.NET all the Way

The pipeline in ImportDataFromFtp() describes every step of the actual import.

  • Create a new instance of Importer in Defer() to streamline the error handling, otherwise we would need a try-catch because our constructor could raise an error. (Although in doesn’t in our demo)
  • Fetching the FTP access data and convert the List to an observable
  • Download files asynchronously and in parallel using provided degree of parallelism (_maxConcurrentDownloads = 5)
  • Parse downloaded files asynchronously and in parallel using provided degree of parallelism (_maxConcurrentParsings = 3)
  • Collect all parsed data. Alternatively, use the operator Buffer to do it in batches. But in this case, we won’t have a single database transaction and could end up with partially imported data – which may or may not be problem.
  • Save all data or batches in the database.
  • (Mediocre) Error handling of all exceptions raised in this “FTP Import”-pipeline.

Depending on the requirements, the error handling could be more fine-grained. For example, if 1 download fails, what do we do next? Do we want to cancel the whole run entirely, or do we want to import the rest because the data of each supplier is independent anyways?

				
					private IObservable<Unit> ImportDataFromFtp(long importId)
{
   // the "Importer" is immutable and thread-safe

   return Observable.Defer(() => Observable.Return(new Importer(importId)))
     .Do(_ => Console.WriteLine($"[{importId}] ==> Starting import"))
     .SelectMany(importer => importer.FetchSupplierFtpInfos()
       .ToObservable()
       .SelectAsync((ftpInfo, token) => importer.DownloadFileAsync(ftpInfo, token))
       .Merge(_maxConcurrentDownloads) // degree of parallelism for downloads: 5
       .SelectAsync((file, token) => importer.ParseAsync(file, token))
       .Merge(_maxConcurrentParsings) // degree of parallelism for parsing: 3
       .ToList()  // collect all results to pass them to SaveToDatabaseAsync all at once
       .Where(d => d.Count > 0)       // ignore empty collections
       .SelectManyAsync((data, token) => importer.SaveToDatabaseAsync(data, token))
      )
     .Do(_ => Console.WriteLine($"[{importId}] <== Import finished"))
     .Catch<Unit, Exception>(ex =>
     {
        Console.WriteLine($"[{importId}] <== Import cancelled due to '{ex.GetType().Name}: {ex.Message}'");
        return Observable.Empty<Unit>();
     });
}
				
			

The Right Tool for the Job

If we don’t need to restrict the degree of parallelism for each step and the previous code sample is too much Rx.NET for you, then you can take the classic approach without loosing the benefits of the “Trigger”-pipeline.

Please note: You can use Task Parallel Library or PLINQ for parallelization of the work as well.

				
					   // "classic" approach
   private IObservable<Unit> ImportDataFromFtp(long importId)
   {
      return Observable.Create<Unit>(async (observer, token) =>
      {
        try
        {
           await ImportDataFromFtpAsync(importId, token);
        }
        catch (Exception ex)
        {
           Console.WriteLine($"[{importId}] <== Import cancelled due to '{ex.GetType().Name}: {ex.Message}'");
        }
        finally
        {
           observer.OnCompleted();
        }

        return Disposable.Empty;
      });
   }

   private static async Task ImportDataFromFtpAsync(long importId, CancellationToken token)
   {
      var importer = new Importer(importId);
      Console.WriteLine($"[{importId}] ==> Starting import");

      var dataTasks = importer.FetchSupplierFtpInfos()
                              .Select(async ftpInfo =>
                              {
                                 var file = await importer.DownloadFileAsync(ftpInfo, token);
                                 return await importer.ParseAsync(file, token);
                              });

      var data = await Task.WhenAll(dataTasks);

      if (data.Length > 0)
         await importer.SaveToDatabaseAsync(data, token);

      Console.WriteLine($"[{importId}] <== Import finished");
   }
				
			

Everything in between the first and second approach is possible as well.

Background Job in Action

Right after starting the console application, we see the output of the initial import. There are always no more than 5 downloads and 3 parsing operations. At the of the run the data of all 7 suppliers are persisted in the database.

				
					[-1] ==> Starting import
[-1] Fetching FTP infos.
[-1] Downloading file from supplier '1'
[-1] Downloading file from supplier '2'
[-1] Downloading file from supplier '3'
[-1] Downloading file from supplier '4'
[-1] Downloading file from supplier '5'
[-1]  Downloaded file from supplier '5'.
[-1]  Downloaded file from supplier '4'.
[-1]  Downloaded file from supplier '3'.
[-1]  Downloaded file from supplier '2'.
[-1]  Downloaded file from supplier '1'.
[-1] Parsing file of supplier '5'.
[-1] Parsing file of supplier '4'.
[-1] Parsing file of supplier '3'.
[-1] Downloading file from supplier '6'
[-1] Downloading file from supplier '7'
[-1]  Parsed file of supplier '3'.
[-1]  Parsed file of supplier '4'.
[-1]  Parsed file of supplier '5'.
[-1] Parsing file of supplier '1'.
[-1] Parsing file of supplier '2'.
[-1]  Downloaded file from supplier '7'.
[-1]  Downloaded file from supplier '6'.
[-1] Parsing file of supplier '7'.
[-1]  Parsed file of supplier '2'.
[-1]  Parsed file of supplier '1'.
[-1] Parsing file of supplier '6'.
[-1]  Parsed file of supplier '7'.
[-1]  Parsed file of supplier '6'.
[-1] Saving data to database. Suppliers: 3, 4, 5, 2, 1, 7, 6.
[-1]  Saved data to database. Suppliers: 3, 4, 5, 2, 1, 7, 6.
[-1] <== Import finished

				
			

If I press the character ‘e’ multiple times within 300ms then just the last keypress is passed by the on-demand trigger down the pipeline. The event with ids -2, -3, -4, -5, -6 are discarded by the operator Throttle.

				
					eeeeee
[-7] ==> Starting import
[-7] Fetching FTP infos.
[-7] Downloading file from supplier '1'
[-7] Downloading file from supplier '2'
...
[-7]  Parsed file of supplier '6'.
[-7]  Parsed file of supplier '7'.
[-7] Saving data to database. Suppliers: 1, 4, 5, 2, 3, 6, 7.
[-7]  Saved data to database. Suppliers: 1, 4, 5, 2, 3, 6, 7.
[-7] <== Import finished

				
			

60 seconds after the start of the application the periodic trigger kicks in and performs another import.

				
					[0] ==> Starting import
[0] Fetching FTP infos.
[0] Downloading file from supplier '1'
[0] Downloading file from supplier '2'
...
[0]  Parsed file of supplier '6'.
[0]  Parsed file of supplier '7'.
[0] Saving data to database. Suppliers: 3, 5, 4, 2, 1, 6, 7.
[0]  Saved data to database. Suppliers: 3, 5, 4, 2, 1, 6, 7.
[0] <== Import finished

				
			

If I press 'e' 2 times during an import with some delay in-between, then the import with id [-2] waits for the current one  with id [1] to finish. The second keypress, which would lead to a run with id [-3], gets discarded by SelectThrottle.

				
					[1] ==> Starting import
[1] Fetching FTP infos.
[1] Downloading file from supplier '1'
...
[1] Downloading file from supplier '5'
e                                        // first press (on-demand trigger)
[1]  Downloaded file from supplier '2'.
...
[1] Parsing file of supplier '5'.
e                                        // second press (on-demand trigger)
[1]  Downloaded file from supplier '7'.
...
[1] Saving data to database. Suppliers: 2, 3, 4, 1, 5, 7, 6.
[1]  Saved data to database. Suppliers: 2, 3, 4, 1, 5, 7, 6.
[1] <== Import finished
[-2] ==> Starting import
[-2] Fetching FTP infos.
[-2] Downloading file from supplier '1'
...
[-2]  Saved data to database. Suppliers: 2, 3, 4, 1, 5, 7, 6.
[-2] <== Import finished

				
			

Summary

In this article we built 2 pipelines with reactive extensions. The first one is a good replacement for the “magic” with timers, semaphores and so on. The second one can be implemented in several ways. Choose the tool depending on your demands and preferences.

I left out one important aspect – testing. The Rx.NET pipelines can be tested using TestScheduler. In order to that we must pass an instance of IScheduler to all methods that accept it, i.e., testing requires some preparation – but that’s for another day.

Free
Newsletter

Current articles, screencasts and interviews by our experts

Don’t miss any content on Angular, .NET Core, Blazor, Azure, and Kubernetes and sign up for our free monthly dev newsletter.

EN Newsletter Anmeldung (#7)
Related Articles
AI
sg
One of the more pragmatic ways to get going on the current AI hype, and to get some value out of it, is by leveraging semantic search. This is, in itself, a relatively simple concept: You have a bunch of documents and want to find the correct one based on a given query. The semantic part now allows you to find the correct document based on the meaning of its contents, in contrast to simply finding words or parts of words in it like we usually do with lexical search. In our last projects, we gathered some experience with search bots, and with this article, I'd love to share our insights with you.
17.05.2024
Angular
sl_300x300
If you previously wanted to integrate view transitions into your Angular application, this was only possible in a very cumbersome way that needed a lot of detailed knowledge about Angular internals. Now, Angular 17 introduced a feature to integrate the View Transition API with the router. In this two-part series, we will look at how to leverage the feature for route transitions and how we could use it for single-page animations.
15.04.2024
.NET
kp_300x300
.NET 8 brings Native AOT to ASP.NET Core, but many frameworks and libraries rely on unbound reflection internally and thus cannot support this scenario yet. This is true for ORMs, too: EF Core and Dapper will only bring full support for Native AOT in later releases. In this post, we will implement a database access layer with Sessions using the Humble Object pattern to get a similar developer experience. We will use Npgsql as a plain ADO.NET provider targeting PostgreSQL.
15.11.2023