A Messaging Saga
One of the core concepts in NServiceBus is the idea of a Saga. A saga is a stateful class, responsible for handling a set of related messages.
It is important to understand that the saga is not kept in memory between messages, but persisted to storage. This means that we can have large amount of concurrent sagas, without worrying about about memory consumption.
Let us take a real example, from my previous post, and see how a saga can take care of it.
[Serializable, Transactional] public class CandidateVerificationSaga : ISaga<VerifyCandidateMessage>, ISaga<ReferenceVerificationMessage>, ISaga<CertificationVerificationMessage> { IRepository<IPotentialCandidate> PotentialCandidateRepository { get; set; } IBus Bus { get; set; } bool hasRejectedReferencesOrCertifications; int clientId; int totalReferences; int totalCertifications; int answeredReferences; int answeredCertifications; [Transaction] public virtual void Handle(VerifyCandidateMessage msg) { clientId = msg.ClientId; IPotentialCandidate candidate = PotentialCandidateRepository.Load(msg.CandidateId); foreach(Reference reference in candidate.References) { Bus.Send(new ApproveReferenceMessage(candidate, reference)); } foreach(Certification cert in candidate.Certifications) { Bus.Send(new ApproveCertificationMessage(candidate, cert)); } totalReferences = candidate.References.Count; totalCertifications = candidate.Certifications.Count; Bus.Send(new TimeoutMessage(msg.AnswerBy, this, null)); } [Transaction] public virtual void Handle(ReferenceVerificationMessage msg) { if(msg.Approved == false) { hasRejectedReferencesOrCertifications = true; MarkCandidateAsFailedVerification(); Bus.Send(new ProblematicReference(candidateId, msg.ReferenceId)); } answeredReferences+=1; CompleteIfDone(); } [Transaction] public virtual void Handle(CertificationVerificationMessage msg) { if(msg.Approved == false) { hasRejectedReferencesOrCertifications = true; MarkCandidateAsFailedVerification(); Bus.Send(new ProblematicCertification(candidateId, msg.CertificationId)); } answeredReferences+=1; CompleteIfDone(); } [Transaction] public virtual void Timeout(object state) { Complete(); } private void CompleteIfDone() { if(totalReferences != answeredReferences || totalCertifications != answeredCertifications) return; Complete(); } private void Complete() { bool passedVerification = hasRejectedReferencesOrCertifications == false && (answeredReferences > 0 || answeredCertifications > 0); if(passedVerification) MoveCandidateToAvailableCandidatePool(); Bus.Send(new CandidateVerificationCompleted(candidateId, passedVerification)); Completed = true; } private void MarkCandidateAsFailedVerification() { IPotentialCandidate candidate = PotentialCandidateRepository.Load(msg.CandidateId); candidate.MarkAsFailedVerification(); } private void MoveCandidateToAvailableCandidatePool() { IPotentialCandidate candidate = PotentialCandidateRepository.Load(msg.CandidateId); candidate.MoveToAvailablePool(); } }
Notice that we have several entry points into the saga, the Timeout and the Handle(XyzMessage) methods. The saga will handle only a single message at a point in time, so we don't have to worry about concurrency. We also are not going to worry about state, we just store it in instance variables and forget about it.
The programming model is very natural, I believe. We get called when messages that we are interested in arrive, and we correlate between them by the infrastructure, so our code doesn't care about that.
Thoughts?
Comments
loving it. It is what WF does, but WF persists a blob to the database, which quite frankly s*cks.
"The saga will handle only a single message at a point in time"
is that guaranteed?
otherwise stuff like answeredReferences+=1 might impose some problems
Hi Ayende,
Looks like a nice alternative to WWF, as Ruurd suggested - much better than attempting to integrate WWF with a messaging technology, the two concepts seem to belong together.
Just a quick question on the code - I could be missing it but there appears to be a Completed property that has no origin?
Nick
Ayende, in this example, what mechanism do you use to inject the dependencies (IRepository<IPotentialCandidate>) when writing handlers for nServiceBus. I have been using Windsor for over a year now on other projects and just picked up nServiceBus. I am trying to determine whether I swallow the pill and learn how to use Spring or somehow try to introduce WIndsor into my nServiceBus application.
What do you think? Seems like having to maintain two separate IoC containers within the same app might lead to confusion and limitations. However, from my cursory review of Spring I am finding that I miss Windsor already. In this example, I assume that the nServiceBus plumbing is brokering the CandidateVerificationSaga, so I guess I would assume that it also injecting the IRepository dependency, is that correct?
Can you explain the Timeout concept a bit. I fail to figure out why you call Complete() in your timeout handler? Shouln't you do some kind of "retry" in case of a timeout?
Yes, it is.
How it is guaranteed is a bit complex, but it is.
Nick,
Yes, I forgot to add it to the code. It is an auto property.
Andrew,
Using two containers sucks and should be avoided at all costs, period.
NSB is using spring as a default container, but is not tied to the idea. You can replace that (you would need to write an adapter for the IBuilder interface, but it is very trivial thing, basically).
When you do that, Windsor is responsible for handling IoC
I'd say, Go Udi !
Let us say that we have specified that a verification process should take 14 days.
At the end of those 14 days, we have to decide what to do. In this case, I decided that I would assume the candidate is valid if at least one reference or certification came out okay, and no certification/reference came out invalid.
Retrying would mean waiting another 14 days, and then what?
Andreas,
Time is a business concept, as described in the rule: "verification process should take at most 14 days" and as such needs to be treated explicitly.
The technological timeout that you were referring to is handled automatically by the infrastructure:
You try to connect to the DB in your code, it timeouts, throws an exception, that bubbles up and breaks the message-level transaction causing the message to be returned to the queue and all other work rolling back. Once the message is in the queue again, some thread will pick it up and try to handle it - effectively retrying.
Hope that makes sense.
Comment preview