In database systems, you need a way to know what has happened, as this allows you to recover from any issues. MongoDB is no different and has an operational log (oplog) for recording all operations that modify the database.
But if you want to be aware of changes to the database from your application, such as inserting or updating new documents, you don’t want to have to mess around with the oplog. So for this reason, back in MongoDB 6.0, we introduced Change Streams. This allows you to easily access the changes in real-time from your applications. It is available in multiple programming languages and that includes C#.
So in this tutorial, we are going to implement change stream functionality to an existing healthcare application. We will take advantage of SignalR for real-time communication from a Blazor application and hook it up to Change Streams using the C# driver.
By the end, there will be a page to simulate a pharmacy that will be alerted every time a new health condition is added for a patient. In the real world, this might be a great way to notify a pharmacy of a new prescription as it is prescribed to a patient in real-time.
If you just want to view the code, there is a branch of the repo available called
with-change-streams
where you can view the full functioning version.
Prerequisites
This tutorial is kept as simple as possible to follow along, but you will still need a few things in place:
A MongoDB cluster on MongoDB 6.0 or later and your connection string copied to the clipboard
A forked and cloned copy of the repo on the start-change-streams branch with the copied connection string added to app settings
.NET 9
An IDE/text editor that supports .NET/C#, such as Rider or VS Code
Implementing HealthConditionChangeStreamService
There is already lots of skeleton code available to you in the repo to minimise the amount of code required to be added outside the scope of the change stream feature itself. One of these semi-implemented classes available is HeathConditionChangeStreamService.cs
inside the Services folder.
You will see there are already local variables available for the MongoDB Service class, SignalR hub, conditions cache, and an IMongoCollection<Patient>
variable that will allow us to access the patients collection in our database. Let’s go ahead and add the missing code to the ExecuteAsync
method that is already there.
We will do it in small sections and discuss what is happening. But all code will go inside this method. Start by pasting the following code:
var allPatients = await _patientsCollection.Find(_ => true).ToListAsync(stoppingToken);
foreach (var patient in allPatients)
{
var patientId = patient.Id.ToString();
var conditionNames = patient.PatientRecord.HealthConditions.Select(hc => hc.Name).ToList();
_conditionCache[patientId] = conditionNames;
}
Change streams will let you know when any part of a document has been modified in some way. But in the case of array fields, such as the HealthConditions field, it can provide the new array but not what has changed. So we build up a cache first, containing the names of the health conditions as they currently are stored.
Now, we will start to configure our change stream to return the full document, create an empty pipeline as we don’t want to apply any filtering to the stream, and then open the stream. Paste the following code into the method below the code you just added:
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};
var pipeline =
PipelineDefinition<ChangeStreamDocument<Patient>, ChangeStreamDocument<Patient>>.Create(new BsonDocument[]
{ });
using var cursor = await _patientsCollection.WatchAsync(pipeline, options, stoppingToken);
Finally, for this section, we will add a larger chunk of code to loop through the update when it arrives:
await cursor.ForEachAsync(change =>
{
if (change.OperationType != ChangeStreamOperationType.Replace && change.OperationType != ChangeStreamOperationType.Create && change.FullDocument is null)
return;
var patient = change.FullDocument;
var patientId = patient.Id.ToString();
var newConditionNames = patient.PatientRecord.HealthConditions.Select(hc => hc.Name).ToList();
// Retrieve the old list from the cache
if (_conditionCache.TryGetValue(patientId, out var oldConditionNames))
{
// Determine added and removed conditions
var addedConditions = newConditionNames.Except(oldConditionNames).ToList();
var removedConditions = oldConditionNames.Except(newConditionNames).ToList();
var updatedConditions = newConditionNames
.Where(newCondition => oldConditionNames.Any(oldCondition => oldCondition != newCondition))
.ToList();
// Handle added conditions
if (addedConditions.Any())
{
_hubContext.Clients.All.SendAsync("NewHealthConditionAdded", new
{
patient = patient.PatientName,
condition = addedConditions
}, cancellationToken: stoppingToken);
}
// Handle removed conditions
if (removedConditions.Any())
{
_hubContext.Clients.All.SendAsync("HealthConditionRemoved", new
{
patient = patient.PatientName,
condition = removedConditions
}, cancellationToken: stoppingToken);
}
// Handle updated conditions
if (updatedConditions.Any())
{
_hubContext.Clients.All.SendAsync("HealthConditionUpdated", new
{
patient = patient.PatientName,
condition = updatedConditions
}, cancellationToken: stoppingToken);
}
// Update the cache with the new list
_conditionCache[patientId] = newConditionNames;
}
else
{
_conditionCache[patientId] = newConditionNames;
// Send alert if there are any conditions
if (newConditionNames.Any())
{
_hubContext.Clients.All.SendAsync("NewHealthConditionAdded", new
{
patient = patient.PatientName,
condition = newConditionNames
});
}
}
}, stoppingToken);
This is quite a lot of code but what it is doing is fairly straightforward. It is looking at only replaced and new documents because when editing a patient in a different page, the update method in MongoDBService
actually replaces the whole document. So if we selected to watch for updates, it would never trigger.
Then, if it is the correct type of change stream update, it processes the changes, including checking the difference between the cached health conditions array and the new one in the document and sending out an alert to all registered clients of our SignalR hub.
It passes stoppingToken
which is a cancellation token to the driver as part of the loop, so if the cancellation is ever triggered, it stops iterating through the change stream cursor.
Updating Pharmacy page to listen for updates
There is already an existing Pharmacy page in the Components/Pages
folder and it has been set up to have a table for viewing updates, and a hub created to listen to the existing pharmacy hub available in the repo.
All we need to do is paste the following code below the hub declaration in OnInitializedAsync()
and before the call to StartAsync
:
hub.On<object>("NewHealthConditionAdded", (data) =>
{
var patient = (string)((JsonElement)data).GetProperty("patient").GetString();
var conditions = ((JsonElement)data).GetProperty("condition").EnumerateArray().Select(x => x.GetString()).ToList();
InvokeAsync(() =>
{
foreach (var condition in conditions)
{
alerts.Insert(0, $"{patient} has a new condition: {condition}");
}
StateHasChanged();
});
});
hub.On<object>("HealthConditionUpdated", (data) =>
{
var patient = (string)((JsonElement)data).GetProperty("patient").GetString();
var conditions = ((JsonElement)data).GetProperty("condition").EnumerateArray().Select(x => x.GetString()).ToList();
InvokeAsync(() =>
{
foreach (var condition in conditions)
{
alerts.Insert(0, $"{patient} condition {condition} has been updated");
}
StateHasChanged();
});
});
hub.On<object>("HealthConditionRemoved", (data) =>
{
var patient = (string)((JsonElement)data).GetProperty("patient").GetString();
var conditions = ((JsonElement)data).GetProperty("condition").EnumerateArray().Select(x => x.GetString()).ToList();
InvokeAsync(() =>
{
foreach (var condition in conditions)
{
alerts.Insert(0, $"{patient} condition {condition} has been removed");
}
StateHasChanged();
});
});
This is listening for any new, updated, or deleted conditions, getting the value and then adding it to the list of alerts. The UI is then informed that the state has changed so that it will display the alerts. And that’s it!
Testing it out
We now have a working pharmacy alert system that we can test out. Run the application and open two tabs, one for the main application and one set to /pharmacy
. If you add new patients with health conditions or edit an existing patient to add a new health condition, you will see it update in the table of pharmacy alerts!
Summary
And there we go! In a matter of minutes, we have a working application that adds MongoDB Change Stream functionality to a .NET/C# Blazor application, using SignalR.
The full code can be found in the GitHub repo on the with-change-streams
branch.
Why not start applying it to your applications today? If you have any questions or want to share how you got on, you can visit our Community Forums.
Top comments (0)