Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,546
|
Comments: 51,161
Privacy Policy · Terms
filter by tags archive
time to read 4 min | 764 words

I previously asked what the code below does, and mentioned that it should give interesting insight into the kind of mindset and knowledge a candidate has. Take a look at the code again:


#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/stat.h>


#define BUFFER_SIZE (3ULL * 1024 * 1024 * 1024) // 3GB in bytes


int main() {
    int fd;
    char *buffer;
    struct stat st;


    buffer = (char *)malloc(BUFFER_SIZE);
    if (buffer == NULL) {
        return 1;
    }


    fd = open("large_file.bin", O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR);
    if (fd == -1) {
        return 2;
    }


    if (write(fd, buffer, BUFFER_SIZE) == -1) {
        return 3;
    }


    if (fsync(fd) == -1) {
        return 4;
    }


    if (close(fd) == -1) {
        return 5;
    }


    if (stat("large_file.bin", &st) == -1) {
        return 6;
    }


    printf("File size: %.2f GB\n", (double)st.st_size / (1024 * 1024 * 1024));


    free(buffer);
    return 0;
}

This program will output: File size: 2.00 GB

And it will write 2 GB of zeros to the file:


~$ head  large_file.bin  | hexdump -C
00000000  00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 00  |................|
*
7ffff000

The question is why? And the answer is quite simple. Linux has a limitation of about 2 GB for writes to the disk. Any write call that attempts to write more than that will only write that much, and you’ll have to call the system again. This is not an error, mind. The write call is free to write less than the size of the buffer you passed to it.

Windows has the same limit, but it is honest about it

In Windows, all write calls accept a 32-bit int as the size of the buffer, so this limitation is clearly communicated in the API. Windows will also ensure that for files, a WriteFile call that completes successfully writes the entire buffer to the disk.

And why am I writing 2 GB of zeros? In the code above, I’m using malloc(), not calloc(), so I wouldn’t expect the values to be zero. Because this is a large allocation, malloc() calls the OS to provide us with the buffer directly, and the OS is contractually obligated to provide us with zeroed pages.

time to read 1 min | 128 words

I posted this code previously:

And asked what it prints. This is actually an infinite loop that will print an endless amount of zeros to the console. The question is why.

The answer is that we are running into two separate features of C# that interact with each other in a surprising way.

The issue is that we are using a nullable iterator here, and accessing the struct using the Value property. The problem is that this is a struct, and using a property will cause it to be copied.

So the way it works, the code actually runs:

And now you can more easily see the temporary copies that are created and how because we are using a value type here, we are using a different instance each time.

time to read 2 min | 366 words

I asked the following question, about code that uses AsyncLocal as well as async calls. Here is the code again:

This code prints False twice, the question is why. I would expect that the AsyncLocal value to remain the same after the call to Start(), since that is obviously the point of AsyncLocal. It turns out that this isn’t the case.

AsyncLocal is good if you are trying to pass a value down to child tasks, but it won’t be applicable to other tasks that are called in the same level. In other words, it works for children, not siblings tasks. This is actually even more surprising in the code above, since we don’t do any awaits in the Start() method.

The question is why? Looking at the documentation, I couldn’t see any reason for that. Digging deeper into the source code, I figured out what is going on.

We can use SharpLab.io to lower the high level C# code to see what is actually going on here, which gives us the following code for the Start() method:

Note that we call to AsyncTaskMethodBuilder.Start() method, which ends up in AsyncMethodBuilderCore.Start(). There we have a bunch of interesting code, in particular, we remember the current thread execution context before we execute user code, here. After the code is done running, we restore it if this is needed, as you can see here.

That looks fine, but why would the execution context change here? It turns out that one of the few places that interact with it is the AsyncValue itself, which ends up in the ExecutionContext.SetLocalValue. The way it works, each time you set an async local, it creates a new layer in the async stack. And when you exit an async call, it will reset the async stack to the place it was before the async call started.

In other words, the local in the name AsyncLocal isn’t a match to ThreadLocal, but is more similar to a local variable, which goes out of scope on function exit.

This isn’t a new thing, and there are workarounds, but it was interesting enough that I decided to dig deep and understand what is actually going on.

time to read 3 min | 407 words

I asked why this code is broken, and now is the time to dig into this. The issue is in this block of code. Take a look at that for a moment, if you please:

The code is trying to gather the async upload of all the files, and then it will await them. This code compile and runs successfully, but it will not do what you expect it to do. Let’s break it up a bit to understand what is going on:

We are passing the variable task to the list of tasks we have. We just extract a variable, nothing much going on here. Let’s explore further, what is the type of task? We know it must be a subtype of Task, since that is what the tasks collection will take. It turns out that this isn’t that simple:

What is that, Task<Task> thingie? Well, let’s look at the signature of Task.Factory.StartNew(), shall we?

public Task<TResult> StartNew<TResult>(Func<TResult> function);

Just from the signature, we can tell what is going on. StartNew() accepts a function that returns a value and will then return a task for the eventual result of this function. However, the function we are actually passing to the StartNew() doesn’t produce a value. Except that it does…

Let’s explore that thing for a bit:

var func = async () => { };

What is the type of func in this case?

Func<Task> func = async() => {};

The idea is that when the compiler sees the async keyword, it transforms the function to one that returns a Task. Combining both those features together means that our original code actually registers the start of the asynchronous process to happen and will return as soon as it got started. Basically, we’ll only wait for the actual opening of the file, not for the actual network work that has to happen here.

The right way to express what we want here is:

Task.Run(async () => {});

The signature for this is:

public static Task Run<TResult>(Func<Task> function);

You can see here that we get a function that returns a task, but we aren’t actually wrapping that in another Task instance. The task that will be returned will be completed once the full work has been completed.

It is an interesting pitfall in the API, and can be quite hard to figure out exactly what is going on. Mostly because there are several different things happening all at once.

time to read 2 min | 225 words

This was a surprising shock, this code seems so simple, but it does something very different than what I would expect.

The question is why?

As it turns out, we are missing one character here:

image

Notice the lack of the comma? Let us see how the compiler is treating this code by breaking it apart into steps, shall we?

First, let us break it into two statements:

Note that we moved the name line into the first statement, since there isn’t a command here. But what is it actually doing? This looks very strange, but that is just because we have the dictionary initializer here. If we drop it, we get:

image

And that make a lot more sense. If we’ll break it all down, we have:

And that explains it all, make perfect sense, and a very nasty trap. We run into it accidently in production code, and it was near impossible to figure out what was going on or why it happened.

time to read 12 min | 2372 words

In RavenDB, we had this piece of code:

        internal T[] LoadInternal<T>(string[] ids, string[] includes)
        {
            if(ids.Length == 0)
                return new T[0];

            IncrementRequestCount();
            Debug.WriteLine(string.Format("Bulk loading ids [{0}] from {1}", string.Join(", ", ids), StoreIdentifier));
            MultiLoadResult multiLoadResult;
            JsonDocument[] includeResults;
            JsonDocument[] results;
#if !SILVERLIGHT
            var sp = Stopwatch.StartNew();
#else
            var startTime = DateTime.Now;
#endif
            bool firstRequest = true;
            do
            {
                IDisposable disposable = null;
                if (firstRequest == false) // if this is a repeated request, we mustn't use the cached result, but have to re-query the server
                    disposable = DatabaseCommands.DisableAllCaching();
                using (disposable)
                    multiLoadResult = DatabaseCommands.Get(ids, includes);

                firstRequest = false;
                includeResults = SerializationHelper.RavenJObjectsToJsonDocuments(multiLoadResult.Includes).ToArray();
                results = SerializationHelper.RavenJObjectsToJsonDocuments(multiLoadResult.Results).ToArray();
            } while (
                AllowNonAuthoritiveInformation == false &&
                results.Any(x => x.NonAuthoritiveInformation ?? false) &&
#if !SILVERLIGHT
                sp.Elapsed < NonAuthoritiveInformationTimeout
#else 
                (DateTime.Now - startTime) < NonAuthoritiveInformationTimeout
#endif
                );

            foreach (var include in includeResults)
            {
                TrackEntity<object>(include);
            }

            return results
                .Select(TrackEntity<T>)
                .ToArray();
        }

And we needed to take this same piece of code and execute it in:

  • Async fashion
  • As part of a batch of queries (sending multiple requests to RavenDB in a single HTTP call).

Everything else is the same, but in each case the marked line is completely different.

I chose to address this by doing a Method Object refactoring. I create a new class, and moved all the local variables to fields, and moved each part of the method to its own method. I also explicitly gave up control on executing, deferring that to whoever it calling us. We ended up with this:

    public class MultiLoadOperation
    {
        private static readonly Logger log = LogManager.GetCurrentClassLogger();

        private readonly InMemoryDocumentSessionOperations sessionOperations;
        private readonly Func<IDisposable> disableAllCaching;
        private string[] ids;
        private string[] includes;
        bool firstRequest = true;
        IDisposable disposable = null;
        JsonDocument[] results;
        JsonDocument[] includeResults;
                
#if !SILVERLIGHT
        private Stopwatch sp;
#else
        private    DateTime startTime;
#endif

        public MultiLoadOperation(InMemoryDocumentSessionOperations sessionOperations, 
            Func<IDisposable> disableAllCaching,
            string[] ids, string[] includes)
        {
            this.sessionOperations = sessionOperations;
            this.disableAllCaching = disableAllCaching;
            this.ids = ids;
            this.includes = includes;
        
            sessionOperations.IncrementRequestCount();
            log.Debug("Bulk loading ids [{0}] from {1}", string.Join(", ", ids), sessionOperations.StoreIdentifier);

#if !SILVERLIGHT
            sp = Stopwatch.StartNew();
#else
            startTime = DateTime.Now;
#endif
        }

        public IDisposable EnterMultiLoadContext()
        {
            if (firstRequest == false) // if this is a repeated request, we mustn't use the cached result, but have to re-query the server
                disposable = disableAllCaching();
            return disposable;
        }

        public bool SetResult(MultiLoadResult multiLoadResult)
        {
            firstRequest = false;
            includeResults = SerializationHelper.RavenJObjectsToJsonDocuments(multiLoadResult.Includes).ToArray();
            results = SerializationHelper.RavenJObjectsToJsonDocuments(multiLoadResult.Results).ToArray();

            return    sessionOperations.AllowNonAuthoritiveInformation == false &&
                    results.Any(x => x.NonAuthoritiveInformation ?? false) &&
#if !SILVERLIGHT
                    sp.Elapsed < sessionOperations.NonAuthoritiveInformationTimeout
#else 
                    (DateTime.Now - startTime) < sessionOperations.NonAuthoritiveInformationTimeout
#endif
                ;
        }

        public T[] Complete<T>()
        {
            foreach (var include in includeResults)
            {
                sessionOperations.TrackEntity<object>(include);
            }

            return results
                .Select(sessionOperations.TrackEntity<T>)
                .ToArray();
        }
    }

Note that this class doesn’t contain two very important things:

  • The actual call to the database, we gave up control on that.
  • The execution order for the methods, we don’t control that either.

That was ugly, and I decided that since I have to write another implementation as well, I might as well do the right thing and have a shared implementation. The key was to extract everything away except for the call to get the actual value. So I did just that, and we got a new class, that does all of the functionality above, except control where the actual call to the server is made and how.

Now, for the sync version, we have this code:

internal T[] LoadInternal<T>(string[] ids, string[] includes)
{
    if(ids.Length == 0)
        return new T[0];

    var multiLoadOperation = new MultiLoadOperation(this, DatabaseCommands.DisableAllCaching, ids, includes);
    MultiLoadResult multiLoadResult;
    do
    {
        using(multiLoadOperation.EnterMultiLoadContext())
        {
            multiLoadResult = DatabaseCommands.Get(ids, includes);
        }
    } while (multiLoadOperation.SetResult(multiLoadResult));

    return multiLoadOperation.Complete<T>();
}

This isn’t the most trivial of methods, I’ll admit, but it is ever so much better than the alternative, especially since now the async version looks like:

/// <summary>
/// Begins the async multi load operation
/// </summary>
public Task<T[]> LoadAsyncInternal<T>(string[] ids, string[] includes)
{
    var multiLoadOperation = new MultiLoadOperation(this,AsyncDatabaseCommands.DisableAllCaching, ids, includes);
    return LoadAsyncInternal<T>(ids, includes, multiLoadOperation);
}

private Task<T[]> LoadAsyncInternal<T>(string[] ids, string[] includes, MultiLoadOperation multiLoadOperation)
{
    using (multiLoadOperation.EnterMultiLoadContext())
    {
        return AsyncDatabaseCommands.MultiGetAsync(ids, includes)
            .ContinueWith(t =>
            {
                if (multiLoadOperation.SetResult(t.Result) == false)
                    return Task.Factory.StartNew(() => multiLoadOperation.Complete<T>());
                return LoadAsyncInternal<T>(ids, includes, multiLoadOperation);
            })
            .Unwrap();
    }
}

Again, it isn’t trivial, but at least the core stuff, the actual logic that isn’t related to how we execute the code is shared.

time to read 6 min | 1068 words

Originally posted at 4/19/2011

Yesterday I posted the following challenge:

Given the following API, can you think of a way that would prevent memory leaks?

public interface IBufferPool
{
    byte[] TakeBuffer(int size);
    void ReturnBuffer(byte[] buffer);
}

The problem with having something like this is that forgetting to return the buffer is going to cause a memory leak. Instead of having that I would like to have the application stop if a buffer is leaked. Leaked means that no one is referencing this buffer but it wasn’t returned to the pool.

What I would really like is that when running in debug mode, leaking a buffer would stop the entire application and tell me:

  • That a buffer was leaked.
  • What was the stack trace that allocated that buffer.

Let us take a look at how we are going about implementing this, shall we? I am going to defer the actual implementation of the buffer pool to System.ServiceModel.Channels.BufferManager and focus on providing the anti leak features. The result is that this code:

IBufferPool pool = new BufferPool(1024*512, 1024);

var buffer = pool.TakeBuffer(512);
GC.WaitForPendingFinalizers(); // nothing here

pool.ReturnBuffer(buffer);
buffer = null;
GC.WaitForPendingFinalizers(); // nothing here, we released the memory properly

pool.TakeBuffer(512); // take and discard a buffer without returning to the pool
GC.WaitForPendingFinalizers(); // failure!

Will result in the following error:

Unhandled Exception: System.InvalidOperationException: A buffer was leaked. Initial allocation:
   at ConsoleApplication1.BufferPool.BufferTracker.TrackAllocation() in IBufferPool.cs:line 22
   at ConsoleApplication1.BufferPool.TakeBuffer(Int32 size) in IBufferPool.cs:line 60
   at ConsoleApplication1.Program.Main(String[] args) in Program.cs:line 21

And now for the implementation:

public class BufferPool : IBufferPool
{
    public class BufferTracker
    {
        private StackTrace stackTrace;

        public void TrackAllocation()
        {
            stackTrace = new StackTrace(true);
            GC.ReRegisterForFinalize(this);
        }

        public void Discard()
        {
            stackTrace = null;
            GC.SuppressFinalize(this);
        }

        ~BufferTracker()
        {
            if (stackTrace == null)
                return;

            throw new InvalidOperationException(
                "A buffer was leaked. Initial allocation:" + Environment.NewLine + stackTrace
                );
        }
    }

    private readonly BufferManager bufferManager;
    private ConditionalWeakTable<byte[], BufferTracker> trackLeakedBuffers = new ConditionalWeakTable<byte[], BufferTracker>();

    public BufferPool(long maxBufferPoolSize, int maxBufferSize)
    {
        bufferManager = BufferManager.CreateBufferManager(maxBufferPoolSize, maxBufferSize);
    }

    public void Dispose()
    {
        bufferManager.Clear();
        // note that disposing the pool before returning all of the buffers will cause a crash
    }

    public byte[] TakeBuffer(int size)
    {
        var buffer = bufferManager.TakeBuffer(size);
        trackLeakedBuffers.GetOrCreateValue(buffer).TrackAllocation();
        return buffer;
    }

    public void ReturnBuffer(byte[] buffer)
    {
        BufferTracker value;
        if(trackLeakedBuffers.TryGetValue(buffer, out value))
        {
            value.Discard();
        }
        bufferManager.ReturnBuffer(buffer);
    }
}

As you can see, utilizing ConditionalWeakTable is quite powerful, since it allows us to support a lot of really advanced scenarios in a fairly simple ways.

time to read 4 min | 769 words

Originally posted at 12/15/2010

Yesterday I asked what is wrong with the following code:

public ISet<string> GetTerms(string index, string field)
{
    if(field == null) throw new ArgumentNullException("field");
    if(index == null) throw new ArgumentNullException("index");
    
    var result = new HashSet<string>();
    var currentIndexSearcher = database.IndexStorage.GetCurrentIndexSearcher(index);
    IndexSearcher searcher;
    using(currentIndexSearcher.Use(out searcher))
    {
        var termEnum = searcher.GetIndexReader().Terms(new Term(field));
        while (field.Equals(termEnum.Term().Field()))
        {
           result.Add(termEnum.Term().Text());

            if (termEnum.Next() == false)
                break;
        }
    }

    return result;
}

The answer to that is quite simple, this code doesn’t have any paging available. What this means is if we executes this piece of code on an field with very high number of unique items (such as, for example, email addresses), we would return all the results in one shot. That is, if we can actually fit all of them to memory. Anything that can run over potentially unbounded result set should have paging as part of its basic API.

This is not optional.

Here is the correct piece of code:

public ISet<string> GetTerms(string index, string field, string fromValue, int pageSize)
{
    if(field == null) throw new ArgumentNullException("field");
    if(index == null) throw new ArgumentNullException("index");
    
    var result = new HashSet<string>();
    var currentIndexSearcher = database.IndexStorage.GetCurrentIndexSearcher(index);
    IndexSearcher searcher;
    using(currentIndexSearcher.Use(out searcher))
    {
        var termEnum = searcher.GetIndexReader().Terms(new Term(field, fromValue ?? string.Empty));
        if (string.IsNullOrEmpty(fromValue) == false)// need to skip this value
        {
            while(fromValue.Equals(termEnum.Term().Text()))
            {
                if (termEnum.Next() == false)
                    return result;
            }
        }
        while (field.Equals(termEnum.Term().Field()))
        {
            result.Add(termEnum.Term().Text());

            if (result.Count >= pageSize)
                break;

            if (termEnum.Next() == false)
                break;
        }
    }

    return result;
}

And that is quite efficient, even for searching large data sets.

For bonus points, the calling code ensures that pageSize cannot be too big :-)

time to read 4 min | 616 words

Originally posted at 12/15/2010

Well, the problem on our last answer was that we didn’t protect ourselves from multi threaded access to the slots variable. Here is the code with this fixed:

public class CloseableThreadLocal
{
    [ThreadStatic]
    public static Dictionary<object, object> slots;

    private readonly object holder = new object();
    private Dictionary<object, object> capturedSlots;

    private Dictionary<object, object> Slots
    {
        get
        {
            if (slots == null)
                slots = new Dictionary<object, object>();
            capturedSlots = slots;
            return slots;
        }
    }


    public /*protected internal*/ virtual Object InitialValue()
    {
        return null;
    }

    public virtual Object Get()
    {
        object val;

        lock (Slots)
        {
            if (Slots.TryGetValue(holder, out val))
            {
                return val;
            }
        }
        val = InitialValue();
        Set(val);
        return val;
    }

    public virtual void Set(object val)
    {
        lock (Slots)
        {
            Slots[holder] = val;
        }
    }

    public virtual void Close()
    {
        GC.SuppressFinalize(this);
        if (capturedSlots != null)
            capturedSlots.Remove(this);
    }

    ~CloseableThreadLocal()
    {
        if (capturedSlots == null)
            return;
        lock (capturedSlots)
            capturedSlots.Remove(holder);
    }
}

Is this it? Are there still issues that we need to handle?

time to read 3 min | 461 words

As it turns out, there are a LOT of issues with this code:

public class QueueActions : IDisposable
{
    UnmanagedDatabaseConnection database;
    public string Name { get; private set; }

    public class QueueActions( UnmanagedDatabaseConnectionFactory factory)
    {
         database = factory.Create();
         database.Open(()=> Name = database.ReadName());
    }

   // assume proper GC finalizer impl

    public void Dispose()
    {
          database.Dispose();
    }
}

And the code using this:

using(var factory = CreateFactory())
{
   ThreadPool.QueueUserWorkItem(()=>
   {
          using(var actions = new QueueActions(factory))
          {
               actions.Send("abc");     
          }
    });
}

To begin with, what happens if we close the factory between the first and second lines in QueueActions constructors?

We already have an unmanaged resource, but when we try to open it, we are going to get an exception. Since the exception is thrown from the constructor, it will NOT invoke the usual using logic, and the code will not be disposed.

Furthermore, and the reason for the blog post about it. Dispose itself can also fail.

Here is the actual stack trace that caused this blog post:

Microsoft.Isam.Esent.Interop.EsentErrorException: Error TermInProgress (JET_errTermInProgress, Termination in progress)
at Microsoft.Isam.Esent.Interop.Api.Check(Int32 err) in Api.cs: line 1492
at Microsoft.Isam.Esent.Interop.Api.JetCloseTable(JET_SESID sesid, JET_TABLEID tableid) in Api.cs: line 372
at Microsoft.Isam.Esent.Interop.Table.ReleaseResource() in D:\Work\esent\EsentInterop\Table.cs: line 97
at Microsoft.Isam.Esent.Interop.EsentResource.Dispose() in EsentResource.cs: line 63
at Rhino.Queues.Storage.AbstractActions.Dispose() in AbstractActions.cs: line 146 

FUTURE POSTS

  1. Partial writes, IO_Uring and safety - about one day from now
  2. Configuration values & Escape hatches - 5 days from now
  3. What happens when a sparse file allocation fails? - 7 days from now
  4. NTFS has an emergency stash of disk space - 9 days from now
  5. Challenge: Giving file system developer ulcer - 12 days from now

And 4 more posts are pending...

There are posts all the way to Feb 17, 2025

RECENT SERIES

  1. Challenge (77):
    20 Jan 2025 - What does this code do?
  2. Answer (13):
    22 Jan 2025 - What does this code do?
  3. Production post-mortem (2):
    17 Jan 2025 - Inspecting ourselves to death
  4. Performance discovery (2):
    10 Jan 2025 - IOPS vs. IOPS
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats
}