If I start talking about reactive extensions (Rx.NET) then usually people either don’t know what it is, or they start thinking about complex systems like processing real time data. Although, sometimes reading an implementation of a “simple background job” using timers and semaphores is more challenging than anything else.
One of the main reasons for not using reactive extension is the lack of practical examples for fairly simple use cases – let’s try to change that.
Demo: Source code on GitHub
Reactive extensions:
Use Case: Data Import from FTP
Imagine we have a bunch of suppliers, and we need some data from them, like latest price lists. For this use case we need an importer which downloads the files from FTP, parses them and writes the data into a database.
The requirements are:
- Perform the import periodically (i.e., every X minutes/hours/days)
- Perform the import on startup of the application
- Allow starting the import on-demand (e.g., if someone presses a button or a specific web request comes in)
- Bonus: Don’t perform the import multiple times if on-demand trigger fires multiple times (e.g., if someone presses the button 5 times)
- The imports must not overlap (i.e., don’t start a new run before the current run finishes)
- Don’t let pending imports stack up. For example, when periodic trigger and on-demand trigger fire more often than the actual import can handle in due time.
- Download the files from FTP in parallel because it is the slowest I/O operation, and each file comes from different FTP server.
- Bonus: Make degree of parallelism configurable, although, in this case we usually don’t need to restrain the parallelization. Unless the files are kept in memory instead of writing them into a local temp folder.
- Parse the files as soon as possible after download and in parallel. The degree of parallelism must be configurable because parsing requires more RAM and we want to keep the memory usage increase within acceptable range.
- Write data to database all at once or in batches. Don’t write data of each file separately because databases prefer a few bigger writes over hundreds of smaller requests.
Implementation of the Importer
In this article we will concentrate on reactive extensions only, i.e., the download from FTP, parsing files and writing data to database will be simulated by Task.Delay()
and a few log messages.
Basic Structure
The basic structure of the background job is completely Rx.NET-free and contains typical methods like Start()
and Stop()
. Furthermore, the class has the method EnqueueImport()
which is the on-demand trigger I talked about in previous section. The constructor expects the periodic import interval (importInterval
), maximum concurrency of FTP downloads (maxConcurrentDownloads
) and parsing files (maxConcurrentParsings
).
public class ImporterBackgroundJob : IDisposable
{
public ImporterBackgroundJob(
TimeSpan importInterval,
int maxConcurrentDownloads,
int maxConcurrentParsings)
{
// TODO: create Rx.NET pipeline
}
public void Start()
{
// TODO: starts the background job
}
public void Stop()
{
// TODO: stops the background job
}
public void EnqueueImport()
{
// TODO: starts the import as soon as possible
}
public void Dispose()
{
// stop/dispose the Rx.NET pipeline
}
}
The actual work, like downloading a file, will be delegated to the class Importer
. The Importer
is immutable and thread-safe. The constructor gets an ImportId
for logging purposes. With this id we can conclude the trigger of the current run.
- Positive numbers (0, 1, 2, etc.): triggered by periodic trigger
- -1: triggered on startup
- Negative numbers starting with -2 (-3, -4, etc.): triggered by on-demand trigger
public class Importer
{
public long ImportId { get; }
public Importer(long importId)
{
ImportId = importId;
}
// Fetches FTP url, path and the credentials
public List FetchSupplierFtpInfos()
{
Console.WriteLine($"[{ImportId}] Fetching FTP infos.");
return Enumerable.Range(1, 7)
.Select(id => new FtpInfo(id))
.ToList();
}
public async Task DownloadFileAsync(FtpInfo ftpInfo, CancellationToken cancellationToken)
{
Console.WriteLine($"[{ImportId}] Downloading file from supplier '{ftpInfo.SupplierId}'");
await Task.Delay(1500, cancellationToken);
Console.WriteLine($"[{ImportId}] Downloaded file from supplier '{ftpInfo.SupplierId}'.");
return new FtpFile(ftpInfo.SupplierId);
}
public async Task ParseAsync(FtpFile file, CancellationToken cancellationToken)
{
Console.WriteLine($"[{ImportId}] Parsing file of supplier '{file.SupplierId}'.");
await Task.Delay(1000, cancellationToken);
Console.WriteLine($"[{ImportId}] Parsed file of supplier '{file.SupplierId}'.");
return new Data(file.SupplierId);
}
public async Task SaveToDatabaseAsync(IList data, CancellationToken cancellationToken)
{
Console.WriteLine($"[{ImportId}] Saving data to database. Suppliers: {String.Join(", ", data.Select(d => d.SupplierId))}.");
await Task.Delay(200, cancellationToken);
Console.WriteLine($"[{ImportId}] Saved data to database. Suppliers: {String.Join(", ", data.Select(d => d.SupplierId))}.");
}
}
Console Application
The console application has a listener on keypress event to be able start ('s'
), stop ('c'
) the background job, and to trigger an import ('e'
). The importer will run every minute and download 5 files in parallel. The maximum degree of parallelism for parsing is 3.
var importerJob = new ImporterBackgroundJob(TimeSpan.FromSeconds(60),
maxConcurrentDownloads: 5,
maxConcurrentParsings: 3);
SetupKeyPressListener(importerJob);
importerJob.Start();
// prevents immediate shutdown of the console application
await Task.Delay(TimeSpan.FromMinutes(5));
static void SetupKeyPressListener(ImporterBackgroundJob importer)
{
Task.Run(() =>
{
while (true)
{
var key = Console.ReadKey();
switch (key.KeyChar)
{
// enqueue
case 'e':
importer.EnqueueImport();
break;
// start
case 's':
importer.Start();
break;
// stop/exit
case 'c':
// exit on ctrl+c
if (key.Modifiers == ConsoleModifiers.Control)
Environment.Exit(0);
importer.Stop();
break;
}
}
});
}
Here is the csproj-file, for the sake of completion.
Exe
net7.0
enable
enable
Rx.NET Pipeline
Up until now, the code was plain C# doing basic stuff. The implementation of a pipeline will be done in 2 parts. First, we create a pipeline for 3 triggers (periodic, on startup, on-demand) and then a pipeline for the actual import from FTP.
Pipeline: Triggers
The setup of the pipeline is in the constructor because it is not executed until there is a subscriber, similar to a LINQ query without methods like ToList
or FirstOrDefault
. But these few lines can be moved to Start()
as well.
First, we create the _onDemandTrigger
which works as the mediator between the method EnqueueImport()
and the pipeline. On each call of EnqueueImport()
we push a new import-id (for logging purposes) into _onDemandTrigger
that passes the value to the pipeline.
The actual _pipeline
starts with the periodic trigger Observable.Interval()
. The Interval
fires an event according to importInverval
– in our case every 60 seconds.
The startup trigger is implemented via StartsWith()
that fires when the pipeline is activated by subscribing to it. The subscription is made in the method Start()
.
Please note: If the background job is started and stopped multiple times, then the initial import will be executed on each call of the method
Start()
and not just once. Use a flag (e.g.,bool
) orBehaviorSubject
to prevent multiple executions.
Last but not least, we have to wire up the on-demand trigger with Merge()
. As a bonus, we use Throttle
to prevent pushing multiple values down the pipeline if, for example, the user presses the button 5 times in quick succession (within 300ms).
public class ImporterBackgroundJob : IDisposable
{
private readonly int _maxConcurrentDownloads;
private readonly int _maxConcurrentParsings;
private readonly Subject _onDemandTrigger;
private readonly IObservable _pipeline;
private long _onDemandCounter = -1; // for logging purposes
private IDisposable? _runningPipeline;
public ImporterBackgroundJob(
TimeSpan importInterval,
int maxConcurrentDownloads,
int maxConcurrentParsings)
{
_maxConcurrentDownloads = maxConcurrentDownloads;
_maxConcurrentParsings = maxConcurrentParsings;
_onDemandTrigger = new Subject();
_pipeline = Observable.Interval(importInterval) // periodic trigger
.StartWith(-1) // triggers immediately when calling "Subscribe"
.Merge(_onDemandTrigger // triggers when calling "EnqueueImport"
// trigger once if we get multiple triggers within 300ms
.Throttle(TimeSpan.FromMilliseconds(300)))
.SelectThrottle(i => ImportDataFromFtp(i))
.Finally(() => Console.WriteLine("Pipeline terminated"));
}
public void Start()
{
// starts the background job
_runningPipeline ??= _pipeline.Subscribe();
}
public void Stop()
{
// stops the background job
_runningPipeline?.Dispose();
_runningPipeline = null;
}
public void EnqueueImport()
{
// starts the import as soon as possible
_onDemandTrigger.OnNext(Interlocked.Decrement(ref _onDemandCounter));
}
private IObservable ImportDataFromFtp(long importId)
{
// TODO: pipeline for the actual import
}
public void Dispose()
{
_onDemandTrigger.Dispose();
_runningPipeline?.Dispose();
}
}
The last step is to call ImportDataFromFtp()
. At this point we have several ways to prevent overlapping of multiple imports.
Select()
+Switch()
: Cancels current run and starts a new one. Usually not a requirement.Select()
+Concat()
: Executes the runs sequentially but pending imports will stack up, if the trigger fire faster than the import can handle them.SelectThrottle()
: Custom operator, that discards events from the triggers, if the the import is running already. Alternatively, keeps X events in the pipeline and discards the rest.
I highly recommend writing own operators (extension methods) for reusability.
Besides SelectThrottle
I created a few convenience methods for asynchronous methods used in the next section.
public static class ObservableExtensions
{
public static IObservable> SelectAsync(
this IObservable source,
Func> selector)
{
return source.Select(value => Observable.FromAsync(token => selector(value, token)));
}
public static IObservable SelectManyAsync(
this IObservable source,
Func selector)
{
return source.SelectMany(value => Observable.FromAsync(token => selector(value, token)));
}
public static IObservable SelectThrottle(
this IObservable source,
Func> selector,
int maxConcurrency = 1,
bool appendAnotherRunIfEmittedDuringExecution = true)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (selector == null)
throw new ArgumentNullException(nameof(selector));
if (maxConcurrency < 1)
throw new ArgumentOutOfRangeException(nameof(maxConcurrency),
"The concurrency cannot be less than 1.");
return Observable.Defer(() =>
{
var innerConcurrency = appendAnotherRunIfEmittedDuringExecution
&& maxConcurrency < Int32.MaxValue
? maxConcurrency + 1
: maxConcurrency;
var concurrentCalls = 0;
return source.Select(v =>
{
if (Interlocked.Increment(ref concurrentCalls) <= innerConcurrency)
return selector(v).Do(DoNoting, () => Interlocked.Decrement(ref concurrentCalls));
Interlocked.Decrement(ref concurrentCalls);
return Observable.Empty();
});
})
.Merge(maxConcurrency);
}
private static void DoNoting(T item)
{
}
}
Pipeline: FTP Import
The pipeline for the actual FTP import can be implemented in multiple ways. We can use Rx.NET all the way or switch to a different tool if it is more appropriate for the task at hand.
Rx.NET all the Way
The pipeline in ImportDataFromFtp()
describes every step of the actual import.
- Create a new instance of Importer in
Defer()
to streamline the error handling, otherwise we would need a try-catch because our constructor could raise an error. (Although in doesn’t in our demo) - Fetching the FTP access data and convert the
List
to an observable - Download files asynchronously and in parallel using provided degree of parallelism (
_maxConcurrentDownloads
= 5) - Parse downloaded files asynchronously and in parallel using provided degree of parallelism (
_maxConcurrentParsings
= 3) - Collect all parsed data. Alternatively, use the operator
Buffer
to do it in batches. But in this case, we won’t have a single database transaction and could end up with partially imported data – which may or may not be problem. - Save all data or batches in the database.
- (Mediocre) Error handling of all exceptions raised in this “FTP Import”-pipeline.
Depending on the requirements, the error handling could be more fine-grained. For example, if 1 download fails, what do we do next? Do we want to cancel the whole run entirely, or do we want to import the rest because the data of each supplier is independent anyways?
private IObservable ImportDataFromFtp(long importId)
{
// the "Importer" is immutable and thread-safe
return Observable.Defer(() => Observable.Return(new Importer(importId)))
.Do(_ => Console.WriteLine($"[{importId}] ==> Starting import"))
.SelectMany(importer => importer.FetchSupplierFtpInfos()
.ToObservable()
.SelectAsync((ftpInfo, token) => importer.DownloadFileAsync(ftpInfo, token))
.Merge(_maxConcurrentDownloads) // degree of parallelism for downloads: 5
.SelectAsync((file, token) => importer.ParseAsync(file, token))
.Merge(_maxConcurrentParsings) // degree of parallelism for parsing: 3
.ToList() // collect all results to pass them to SaveToDatabaseAsync all at once
.Where(d => d.Count > 0) // ignore empty collections
.SelectManyAsync((data, token) => importer.SaveToDatabaseAsync(data, token))
)
.Do(_ => Console.WriteLine($"[{importId}] <== Import finished"))
.Catch(ex =>
{
Console.WriteLine($"[{importId}] <== Import cancelled due to '{ex.GetType().Name}: {ex.Message}'");
return Observable.Empty();
});
}
The Right Tool for the Job
If we don’t need to restrict the degree of parallelism for each step and the previous code sample is too much Rx.NET for you, then you can take the classic approach without loosing the benefits of the “Trigger”-pipeline.
Please note: You can use Task Parallel Library or PLINQ for parallelization of the work as well.
// "classic" approach
private IObservable ImportDataFromFtp(long importId)
{
return Observable.Create(async (observer, token) =>
{
try
{
await ImportDataFromFtpAsync(importId, token);
}
catch (Exception ex)
{
Console.WriteLine($"[{importId}] <== Import cancelled due to '{ex.GetType().Name}: {ex.Message}'");
}
finally
{
observer.OnCompleted();
}
return Disposable.Empty;
});
}
private static async Task ImportDataFromFtpAsync(long importId, CancellationToken token)
{
var importer = new Importer(importId);
Console.WriteLine($"[{importId}] ==> Starting import");
var dataTasks = importer.FetchSupplierFtpInfos()
.Select(async ftpInfo =>
{
var file = await importer.DownloadFileAsync(ftpInfo, token);
return await importer.ParseAsync(file, token);
});
var data = await Task.WhenAll(dataTasks);
if (data.Length > 0)
await importer.SaveToDatabaseAsync(data, token);
Console.WriteLine($"[{importId}] <== Import finished");
}
Everything in between the first and second approach is possible as well.
Background Job in Action
Right after starting the console application, we see the output of the initial import. There are always no more than 5 downloads and 3 parsing operations. At the of the run the data of all 7 suppliers are persisted in the database.
[-1] ==> Starting import
[-1] Fetching FTP infos.
[-1] Downloading file from supplier '1'
[-1] Downloading file from supplier '2'
[-1] Downloading file from supplier '3'
[-1] Downloading file from supplier '4'
[-1] Downloading file from supplier '5'
[-1] Downloaded file from supplier '5'.
[-1] Downloaded file from supplier '4'.
[-1] Downloaded file from supplier '3'.
[-1] Downloaded file from supplier '2'.
[-1] Downloaded file from supplier '1'.
[-1] Parsing file of supplier '5'.
[-1] Parsing file of supplier '4'.
[-1] Parsing file of supplier '3'.
[-1] Downloading file from supplier '6'
[-1] Downloading file from supplier '7'
[-1] Parsed file of supplier '3'.
[-1] Parsed file of supplier '4'.
[-1] Parsed file of supplier '5'.
[-1] Parsing file of supplier '1'.
[-1] Parsing file of supplier '2'.
[-1] Downloaded file from supplier '7'.
[-1] Downloaded file from supplier '6'.
[-1] Parsing file of supplier '7'.
[-1] Parsed file of supplier '2'.
[-1] Parsed file of supplier '1'.
[-1] Parsing file of supplier '6'.
[-1] Parsed file of supplier '7'.
[-1] Parsed file of supplier '6'.
[-1] Saving data to database. Suppliers: 3, 4, 5, 2, 1, 7, 6.
[-1] Saved data to database. Suppliers: 3, 4, 5, 2, 1, 7, 6.
[-1] <== Import finished
If I press the character ‘e’ multiple times within 300ms then just the last keypress is passed by the on-demand trigger down the pipeline. The event with ids -2, -3, -4, -5, -6 are discarded by the operator Throttle
.
eeeeee
[-7] ==> Starting import
[-7] Fetching FTP infos.
[-7] Downloading file from supplier '1'
[-7] Downloading file from supplier '2'
...
[-7] Parsed file of supplier '6'.
[-7] Parsed file of supplier '7'.
[-7] Saving data to database. Suppliers: 1, 4, 5, 2, 3, 6, 7.
[-7] Saved data to database. Suppliers: 1, 4, 5, 2, 3, 6, 7.
[-7] <== Import finished
60 seconds after the start of the application the periodic trigger kicks in and performs another import.
[0] ==> Starting import
[0] Fetching FTP infos.
[0] Downloading file from supplier '1'
[0] Downloading file from supplier '2'
...
[0] Parsed file of supplier '6'.
[0] Parsed file of supplier '7'.
[0] Saving data to database. Suppliers: 3, 5, 4, 2, 1, 6, 7.
[0] Saved data to database. Suppliers: 3, 5, 4, 2, 1, 6, 7.
[0] <== Import finished
If I press 'e'
2 times during an import with some delay in-between, then the import with id [-2] waits for the current one with id [1] to finish. The second keypress, which would lead to a run with id [-3], gets discarded by SelectThrottle
.
[1] ==> Starting import
[1] Fetching FTP infos.
[1] Downloading file from supplier '1'
...
[1] Downloading file from supplier '5'
e // first press (on-demand trigger)
[1] Downloaded file from supplier '2'.
...
[1] Parsing file of supplier '5'.
e // second press (on-demand trigger)
[1] Downloaded file from supplier '7'.
...
[1] Saving data to database. Suppliers: 2, 3, 4, 1, 5, 7, 6.
[1] Saved data to database. Suppliers: 2, 3, 4, 1, 5, 7, 6.
[1] <== Import finished
[-2] ==> Starting import
[-2] Fetching FTP infos.
[-2] Downloading file from supplier '1'
...
[-2] Saved data to database. Suppliers: 2, 3, 4, 1, 5, 7, 6.
[-2] <== Import finished
Summary
In this article we built 2 pipelines with reactive extensions. The first one is a good replacement for the “magic” with timers, semaphores and so on. The second one can be implemented in several ways. Choose the tool depending on your demands and preferences.
I left out one important aspect – testing. The Rx.NET pipelines can be tested using TestScheduler
. In order to that we must pass an instance of IScheduler
to all methods that accept it, i.e., testing requires some preparation – but that’s for another day.