A Messaging Saga

time to read 3 min | 407 words

One of the core concepts in NServiceBus is the idea of a Saga. A saga is a stateful class, responsible for handling a set of related messages.

It is important to understand that the saga is not kept in memory between messages, but persisted to storage. This means that we can have large amount of concurrent sagas, without worrying about about memory consumption.

Let us take a real example, from my previous post, and see how a saga can take care of it.

[Serializable, Transactional]
public class CandidateVerificationSaga 
	: ISaga<VerifyCandidateMessage>,
	  ISaga<ReferenceVerificationMessage>,
	  ISaga<CertificationVerificationMessage>
{
	IRepository<IPotentialCandidate> PotentialCandidateRepository { get; set; }
	
	IBus Bus { get; set; }
	
	bool hasRejectedReferencesOrCertifications;
	
	int clientId;
	
	int totalReferences;
	int totalCertifications;
	
	int answeredReferences;
	int answeredCertifications;

	[Transaction]
	public virtual void Handle(VerifyCandidateMessage msg)
	{
		clientId = msg.ClientId;
		
		IPotentialCandidate candidate = PotentialCandidateRepository.Load(msg.CandidateId);
		
		foreach(Reference reference in candidate.References)
		{
			Bus.Send(new ApproveReferenceMessage(candidate, reference));
		}
		
		foreach(Certification cert in candidate.Certifications)
		{
			Bus.Send(new ApproveCertificationMessage(candidate, cert));
		}
		
		totalReferences = candidate.References.Count;
		totalCertifications = candidate.Certifications.Count;
		
		Bus.Send(new TimeoutMessage(msg.AnswerBy, this, null));
	}
	
	[Transaction]
	public virtual void Handle(ReferenceVerificationMessage msg)
	{
		if(msg.Approved == false)
		{
			hasRejectedReferencesOrCertifications = true;
			MarkCandidateAsFailedVerification();
			Bus.Send(new ProblematicReference(candidateId, msg.ReferenceId));
		}
		
		answeredReferences+=1;
		
		CompleteIfDone();
	}
	
	[Transaction]
	public virtual void Handle(CertificationVerificationMessage msg)
	{
		if(msg.Approved == false)
		{
			hasRejectedReferencesOrCertifications = true;
			MarkCandidateAsFailedVerification();
			Bus.Send(new ProblematicCertification(candidateId, msg.CertificationId));
		}
		
		answeredReferences+=1;
		
		CompleteIfDone();
	}
	
	[Transaction]
	public virtual void Timeout(object state)
	{
		Complete();
	}
	
	private void CompleteIfDone()
	{
		if(totalReferences != answeredReferences ||
			totalCertifications != answeredCertifications)
			return;
		
		Complete();
	}
	
	private void Complete()
	{
		bool passedVerification = hasRejectedReferencesOrCertifications == false && 
			(answeredReferences > 0 || answeredCertifications > 0);
		
		if(passedVerification)
			MoveCandidateToAvailableCandidatePool();
		
		Bus.Send(new CandidateVerificationCompleted(candidateId, passedVerification));
		Completed = true;
	}
	
	private void MarkCandidateAsFailedVerification()
	{
		IPotentialCandidate candidate = PotentialCandidateRepository.Load(msg.CandidateId);
		candidate.MarkAsFailedVerification();
	}
	
	private void MoveCandidateToAvailableCandidatePool()
	{
		IPotentialCandidate candidate = PotentialCandidateRepository.Load(msg.CandidateId);
		candidate.MoveToAvailablePool();
	}
}

Notice that we have several entry points into the saga, the Timeout and the Handle(XyzMessage) methods. The saga will handle only a single message at a point in time, so we don't have to worry about concurrency. We also are not going to worry about state, we just store it in instance variables and forget about it.

The programming model is very natural, I believe. We get called when messages that we are interested in arrive, and we correlate between them by the infrastructure, so our code doesn't care about that.

Thoughts?