Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,546
|
Comments: 51,162
Privacy Policy · Terms
filter by tags archive
time to read 1 min | 192 words

Rhino Service Bus comes with two default implementations of saga persistence (OptimisticDistributedHashTableSagaSatePersister  & DistributedHashTableSagaSatePersister). Both of them rely on the Rhino DHT to actually handle the persistence, but they have quite a different behavior when you start looking at them.

The persistence mechanism is the same, but the difference between them is how they handle concurrency. OptimisticDistributedHashTableSagaSatePersister is very simple to reason about, it uses optimistic concurrency. If you have a concurrency conflict, it will throw and force re-evaluation of the message. You need to opt in to the OptimisticDistributedHashTableSagaSatePersister by extending the marker interface SupportsOptimisticConcurrency.

DistributedHashTableSagaSatePersister is a more complex concurrency solution, which will never lose a write, even if there is a conflict. This requires you to implement merge conflict resolution. It is significantly more complex, and is probably only worth it where you really care about writes always succeeding.

In order to use DistributedHashTableSagaSatePersister properly, your saga needs to implement Orchestrate<MergeSagaState>, that allow the saga to take action upon merges. In addition to that, you need to implement ISagaStateMerger for your particular saga state. This is where the logic for doing the merge is located.

time to read 21 min | 4061 words

Concurrency is a tough topic, fraught with problems, pitfalls and nasty issues. This is especially the case when you try to build distributed, inherently parallel systems. I am dealing with the topic quite a lot recently and I have create several solutions (none of them are originally mine, mind you).

There aren’t that many good solutions our there, most of them boil down to: “suck it up and deal with the complexity.” In this case, I want to try to deal with the complexity in a consistent fashion ( no one off solutions ) and in a way that I can deal without first meditating on the import of socks.

Let us see if I can come up with a good example. We have a saga that we use to check whatever a particular user has acceptable credit to buy something from us. The logic is that we need to verify with at least 2 credit card bureaus, and the average must be over 700. (This logic has nothing to do with the real world, since I just dreamt it up, by the way). Here is a simple implementation of a saga that can deal with those requirements:

   1: public class AccpetableCreditSaga : ISaga<AccpetableCreditState>,
   2:   InitiatedBy<IsAcceptableAsCustomer>,
   3:   Orchestrates<CreditCardScore>, 
   4:   Orchestrates<MergeSagaState>
   5: {
   6:   IServiceBus bus;
   7:   public bool IsCompleted {get;set;}
   8:   public Guid Id {get;set;}
   9:   
  10:   public AccpetableCreditSaga (IServiceBus bus)
  11:   {
  12:     this.bus = bus;
  13:   }
  14:   
  15:   public void Consume(IsAcceptableAsCustomer message)
  16:   {
  17:     bus.Send(
  18:       new Equifax.CheckCreditFor{Card = message.Card),
  19:       new Experian.CheckCreditFor{Card = message.Card),
  20:       new TransUnion.CheckCreditFor{Card = message.Card)
  21:       );
  22:   }
  23:   
  24:   public void Consume(CreditCardScore message)
  25:   {
  26:     State.Scores.Add(message);
  27:     
  28:     TryCompleteSaga();
  29:   }
  30:   
  31:   public void Consume(MergeSagaState message)
  32:   {
  33:     TryCompleteSaga();
  34:   }
  35:   
  36:   public void TryCompleteSaga()
  37:   {
  38:     if(State.Scores.Count <2)
  39:       return;
  40:      
  41:      bus.Publish(new CreditScoreAcceptable
  42:      {
  43:       CorrelationId = Id,
  44:       IsAcceptable = State.Scores.Average(x=>x.Score) > 700
  45:      });
  46:      IsCompleted = true;
  47:   }
  48: }

We have this strange MergeSagaState message, but other than that, it should be pretty obvious what is going on in here.It should be equally obvious that we have a serious problem here. Let us say that we get two reply messages with credit card scores, at the same time. We will create two instances of the saga that will run in parallel, each of them getting a copy of the saga’s state. But, the end result is that processing those messages doesn’t match the end condition for the saga. So even though in practice we have gotten all the messages we need, because we handled them in parallel, we had no chance to actually see both changes at the same time. This means that any logic that we have that requires us to have a full picture of what is going on isn’t going to work.

Rhino Service Bus solve the issue by putting the saga’s state into Rhino DHT. This means that a single saga may have several states at the same time. Merging them together is also something that the bus will take care off. Merging the different parts is inherently an issue that cannot be solved generically. There is no generic merge algorithm that you can use. Rhino Service Bus define an interface that will allow you to deal with this issue in a clean manner and supply whatever business logic is required to merge difference versions.

Here is an example of how we can merge the different versions together:

   1: public class AccpetableCreditStateMerger : ISagaStateMerger<AccpetableCreditState>
   2: {
   3:   public AccpetableCreditState Merge(AccpetableCreditState[] states)
   4:   {
   5:     return new AccpetableCreditState
   6:     {
   7:       SCores = states.SelectMany(x=>x.Scores)
   8:         .GroupBy(x=>x.Bureau)
   9:         .Select(x => new Score
  10:         {
  11:           Bureau = x.Key,
  12:           Score = x.Max(y=>y.Score)
  13:         }).ToList();
  14:     };
  15:   }
  16: }

Note that this is notepad code, so it may contain errors, but the actual intention should be clear. We accept an array of states that need to be merged, find the highest score from each bureau and return the merged state.

whenever Rhino Service Bus detects that the saga is in a conflicted state, it will post a MergeSagaState message to the saga. This will merge the saga’s state and call the Consume(MergeSagaState), in which the saga gets to decide what it wants to do about this (usually inspect the state to see if we missed anything). This also works for completing a saga, by the way, you cannot complete a saga in an inconsistent state, you will get called again with Consume(MergeSagaSate) to deal with that.

The state merger is also a good place to try to deal with concurrency compensating actions. If we notice in the merger that we perform some action twice and we need to revert one of them, for example. In general, it is better to be able to avoid having to do so, but that is the place for this logic.

time to read 5 min | 918 words

I have talked about Rhino DHT at length, and the versioning story that exists there. What I haven’t talked about is why I built it. Or, to be rather more exact, the actual use case that I had in mind.

Jason Diamond had pointed out a problem with the way sagas work with Rhino Service Bus.

Are BaristaSaga objects instantiated per message? If so, can two different instances be consuming different messages concurrently?

The reason I ask is because it looks like handling the PrepareDrink message could take some time. Is it possible that a PaymentComplete message could come in before the PrepareDrink message is finished being handled?

If the two instances of BaristaSaga have their own instance of BaristaState, I can see the GotPayment value set by handling the PaymentComplete message getting lost.

If the two instances of BaristaSaga share the same instance of BaristaState, do I now have to worry about synchronizing changes to the state across all of the sagas? Also, wouldn't this prevent having multiple barista "servers" handling messages since they wouldn't be able to share instances across processes/machines.

The answer to that is that yes, a saga can execute concurrently. Not only that, but it can execute concurrently on different machines. That put us in somewhat of a problem regarding consistent state.

There are several options that we can use to resolve the issue. One of them is to ensure that this cannot happen by locking on a shared resource when executing the saga (commonly done by opening a transaction on the saga’s row). That can significantly limit the system scalability. Another option is to persist the saga’s state in a way that ensure that we have no conflicts. One way of doing that is to persist the actual state change itself, which allow us to replay the object to a consistent state. Concurrent updates don’t bother us because we aren’t actually modifying the data.

That might require some careful thinking, however, to avoid a case where a saga tat is concurrently executing step on its own feet without paying attention. I strongly dislike anything that require careful thinking. It is like saying that C++’s has no memory leaks issues, it just require some careful thinking.

For RSB, I wanted to be able to do better than that. I selected Rhino DHT at persistence store for the default saga’s state (you can still do other things, of course). That means that concurrency is very explicit. If you got to a point where there were two concurrently executing instances of the saga, their state is going to go to Rhino DHT. Since they are both going to be from the same version, Rhino DHT is going to keep both state changes around.

The next time that we need the state for that particular saga, we are actually going to get both states. At that point, we introduce the ISagaStateMerger:

   1: public interface ISagaStateMerger<TState>
   2:     where TState : IVersionedSagaState
   3: {
   4:     TState Merge(TState[] states);
   5: }

This allow us to handle the notion of concurrency resolution in a very explicit manner. We get the appropriate state merger from the container and use that to merge the states back to a consistent state, which we then pass to the saga to continue its execution.

There is just one additional twist. A saga cannot complete until it is in a consistent state, so if the saga completes while it is in an inconsistent state, we will call the saga again (after resolving the conflict) and let it handle the final state conflict before perform the actual completion.

time to read 19 min | 3714 words

In a messaging system, a saga orchestrate a set of messages. The main benefit of using a saga is that it allows us to manage the interaction in a stateful manner (easy to think and reason about) while actually working in a distributed and asynchronous environment.

In Rhino Service Bus, I built the notion of sagas from the beginning. And I initially went with the approach that mix the saga’s behavior and the saga state in the same class. That did not turn out so well. While it works for simple matters, anything of sufficient complexity started to bring issue. Mainly, it was an issue of managing dependencies and managing state. It is possible to get this worked out, but I decided to follow Udi’s footsteps and create an explicit separation between the two.

Here is how it works, we have the state class:

   1: public class BaristaState
   2: {
   3:     public bool DrinkIsReady { get; set; }
   4:  
   5:     public bool GotPayment { get; set; }
   6:  
   7:     public string Drink { get; set; }
   8: }

This is just a standard class, nothing special here. But here is the actual saga class. This contains the behavior for the saga, with the state being maintained in the state class.

   1: public class BaristaSaga :
   2:     ISaga<BaristaState>,
   3:     InitiatedBy<PrepareDrink>,
   4:     Orchestrates<PaymentComplete>
   5: {
   6:     private readonly IServiceBus bus;
   7:  
   8:     public BaristaState State { get; set; }
   9:  
  10:     public Guid Id { get; set; }
  11:  
  12:     public bool IsCompleted { get; set; }
  13:  
  14:     public BaristaSaga(IServiceBus bus)
  15:     {
  16:         this.bus = bus;
  17:         State = new BaristaState();
  18:     }
  19:  
  20:     public void Consume(PrepareDrink message)
  21:     {
  22:         State.Drink = message.DrinkName;
  23:  
  24:         for (int i = 0; i < 10; i++)
  25:         {
  26:             Console.WriteLine("Barista: preparing drink: " + drink);
  27:             Thread.Sleep(500);
  28:         }
  29:         State.DrinkIsReady = true;
  30:         SubmitOrderIfDone();
  31:     }
  32:  
  33:     public void Consume(PaymentComplete message)
  34:     {
  35:         Console.WriteLine("Barista: got payment notification");
  36:         State.GotPayment = true;
  37:         SubmitOrderIfDone();
  38:     }
  39:  
  40:     private void SubmitOrderIfDone()
  41:     {
  42:         if (State.GotPayment && State.DrinkIsReady)
  43:         {
  44:             Console.WriteLine("Barista: drink is ready");
  45:             bus.Publish(new DrinkReady
  46:             {
  47:                 CorrelationId = Id,
  48:                 Drink = State.Drink
  49:             });
  50:             IsCompleted = true;
  51:         }
  52:     }
  53: }

There are a few things to notice in here. The saga class is a standard component, we use DI to inject dependencies to the class so it can perform whatever it is that it wants. The state property is used to maintain the state of the saga between message invocations.

This results in a simpler design for some parts of the code, and I think that overall it is a very simple model to talk and reason about.

time to read 12 min | 2264 words

One of the requirements that came up on my current project was the need to secure specific fields in a message during transit. I thought about it a while before I decided that this is something that should be made explicit in the message contract.

Here is an example from the tests:

   1: public class ClassWithSecretField
   2: {
   3:     public WireEcryptedString ShouldBeEncrypted
   4:     {
   5:         get; set;
   6:     }
   7: }

WireEncryptedString is a type that would be encrypted on the wire, as the name suggest.

And defining the keys in the configuration is done in this way:

   1: <facility id="rhino.esb" >
   2:   <bus threadCount="1"
   3:        numberOfRetries="5"
   4:        endpoint="msmq://localhost/test_queue2"
   5:          />
   6:   <messages>
   7:     <add name="Rhino.ServiceBus.Tests"
   8:          endpoint="msmq://localhost/test_queue"/>
   9:     <add name="Rhino.ServiceBus.Tests"
  10:          endpoint="msmq://localhost/test_queue2"/>
  11:   </messages>
  12:   <security>
  13:     <key>f/gdDWbDqHRvpqdRbTs3mxhGdZh9qCaDrasxJGXl+5s=</key>
  14:   </security>
  15: </facility>

On the wire, it has the following format:

   1: <?xml version='1.0' encoding='utf-8'?>
   2: <esb:messages 
   3:   xmlns:esb='http://servicebus.hibernatingrhinos.com/2008/12/20/esb' 
   4:   xmlns:tests.classwithsecretfield='Rhino.ServiceBus.Tests.When_Security_Is_Specified_In_Config+ClassWithSecretField, Rhino.ServiceBus.Tests'
   5:   xmlns:datastructures.wireecryptedstring='Rhino.ServiceBus.DataStructures.WireEcryptedString, Rhino.ServiceBus' xmlns:string='string'>
   6:   <tests.classwithsecretfield:ClassWithSecretField>
   7:     <datastructures.wireecryptedstring:ShouldBeEncrypted>
   8:       <string:Value iv='0yL9+t0uyDy9NeP7CU1Wow=='>q9a10IFuRxrzFoZewfdOyg==</string:Value>
   9:     </datastructures.wireecryptedstring:ShouldBeEncrypted>
  10:   </tests.classwithsecretfield:ClassWithSecretField>
  11: </esb:messages>

Following the Rhino Service Bus philosophy, it is quite a neat solution.

The actual encryption is doing using 256 bits key with Rijndael (AES). I considered other approaches, but all of them had quite a big overhead from manageability perspective.

There are some interesting implications for the implementation, that deserve some discussion. Let us assume that you send such a message to another end point.

If the endpoint…

  • has the same key as us, the message will be decrypted and everything works.
  • doesn’t have any security defined. At that point, the message will successfully deserialize. Any WireEncryptedString field will contain the encrypted value.
  • has a different key defined. Message serialization will fail.

Trying to send a message that contains WireEncryptedString will throw, we do not allow such an action.

And now you can tell me how many holes there are in my system :-)

time to read 3 min | 575 words

One of the major hurdles in distributes systems is trying to understand how they work. Different parts are running at different places and sometimes at different times. Standard debugging usually breaks down at this point, because no one has even invented a non sequential debugger that would make sense to humans.

We are left with trying to understand what is going on in the system based on a pretty old notion, the system logs. With Rhino Service Bus, this was one of the things that I really cared about, so I made this into a first class concept. And no, you don’t get to hunt through a 3GB text file. The idea is that each message (and message interaction) in the system can be captured.

The configuration for this is quite simple:

   1: <bus threadCount="1"
   2:          numberOfRetries="5"
   3:          logEndpoint="msmq://localhost/starbucks.log"
   4:          endpoint="msmq://localhost/starbucks.backend"
   5:          />

And once we have done that, we copy each message to the log queue. But it is not just the arrived messages. It is also when a message arrived, how long it took to process it, why it failed, etc.

Using this approach, you can build tools that listen to the log queue and display the information in ways that makes sense to humans. For example, we can create a flow of a saga or conversation, or start getting input about the time it takes to process certain messages or detect SLA violations.

time to read 2 min | 233 words

Yesterday I finally completed the Starbucks sample for Rhino Service Bus. It is surprising to see how many aspects that little sample required.

There are several of highlights in the sample.

  • There are three actors in the application:
    • Barista
    • Cashier
    • Customer
  • There is zero setup necessary, Rhino Service Bus will create the queues if they don’t already exists. Again, the idea of reducing moving parts.
  • All three actors are running in the same process – but each is running in a different AppDomain.
    Note that this is a common deployment choice for development, but not one that I would use for production.
    The idea is that this make it significantly easier to debug & develop a distributed application.
  • There is very little configuration whatsoever. And a lot of conventions about how to figure out what consumers to use and how to build it.
  • The use of sagas & conversations is demoed. The entire buying process is a single conversation composed of several sagas.
  • The customer actor is showing how we can create instance & temporary subscriptions.
time to read 2 min | 284 words

One of the main design goals for Rhino Service Bus was locality and independence of the end point. What do I mean by that?

Well, a Rhino ESB endpoint should have no dependency on any external service in order to operate. Examples of common external services are things like subscription services, timeout managers and error reporting. I already discussed how I handle errors, timeouts and subscriptions with Rhino Service Bus. So the question is why independence is such an important concept.

The answer is quite simple: reduce the number of moving parts. If I need to have a timeout service running to have my timeouts running, this is a problem. If subscriptions don’t work because I didn’t started the subscription service, that is a problem. Thos are problems because those things will happen, and figuring out what exactly is going on is hard, painful and not really recommended. Well, the first time you run into this, at least. As such, I sought to make sure that there would be only a single place that you would have to look in order to get things working.

There are other aspects of this as well, frankly. Less moving parts usually equal to less complexity, but the most important issue is that we have much better options for debugging.

The only place that you need to check for anything is the endpoint.

We currently don’t have tooling (because we rely of the default MSMQ ones to do a fairly adequate job for now), but a few spikes that I run has turned out some really nice tooling is quite simple, and the approach is the same for everything.

time to read 2 min | 396 words

I was first introduced to the notion of time as an explicit concept in Udi’s writing. The notion of being able to send a message in the future was, like most of the things that Udi talks about, simple in terms of implementation, but profound in conceptual terms.

Rhino Service Bus supports this explicitly by specifying a time in which a message will be received. The implementation for that is quite interesting, I think.

Again, we are making use of a subqueue to minimize the additional requirements that RSB has. So when we send a message to the RSB with a delay in it, we move it to the timeout subquque are record that fact internally. We have a timer going off every second that check for expiry of the messages and dispatch them when their time arrives. If the system crash at any point, all the timeouts are stored in the subqueue, and on startup, we are going to read the timeout messages and reconstruct our internal representation of them.

The API is quite simple:

   1: bus.DelaySend(bus.Endpoint, 
   2:     DateTime.Now.AddMilliseconds(250), 
   3:     msg);

We might need to make some changes to support extremely large queues and extremely long duration for delayed messages, but for now, it is doing quite well.

FUTURE POSTS

  1. Partial writes, IO_Uring and safety - about one day from now
  2. Configuration values & Escape hatches - 4 days from now
  3. What happens when a sparse file allocation fails? - 6 days from now
  4. NTFS has an emergency stash of disk space - 8 days from now
  5. Challenge: Giving file system developer ulcer - 11 days from now

And 4 more posts are pending...

There are posts all the way to Feb 17, 2025

RECENT SERIES

  1. Challenge (77):
    20 Jan 2025 - What does this code do?
  2. Answer (13):
    22 Jan 2025 - What does this code do?
  3. Production post-mortem (2):
    17 Jan 2025 - Inspecting ourselves to death
  4. Performance discovery (2):
    10 Jan 2025 - IOPS vs. IOPS
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats
}