Reading a stream of JsonDocuments separated by new lines (ndjson)

 
 
  • Gérald Barré

Some formats require reading a stream of JsonDocuments separated by a character such as a newline. A common example is parsing data from an ndjson response. This post shows how to read such a stream correctly.

A naive approach is to split the stream by the separator and parse each part. However, a JSON document may itself contain the separator character. For example, if you use \n as the separator and the document is indented, it will contain \n characters, making a simple split unreliable. The same problem applies to any other character that may appear in string values.

A better approach is to use a PipeReader to read the stream and attempt to parse a JsonDocument. If parsing fails, the document is incomplete and more data is needed. If parsing succeeds, the document is returned and reading continues.

If you are not familiar with PipeReader and System.IO.Pipelines, check out System.IO.Pipelines: High-performance IO in .NET.

C#
// Create a stream with 3 json documents separated by a new line
var stream = new MemoryStream();
JsonSerializer.Serialize(stream, new { Nickname = "meziantou" });
stream.WriteByte((byte)'\n');
JsonSerializer.Serialize(stream, new { Nickname = "alice" });
stream.WriteByte((byte)'\n');
JsonSerializer.Serialize(stream, new { Nickname = "bob" });
stream.Position = 0;

// Read all the documents
var reader = PipeReader.Create(stream);
await foreach (var jsonDocument in JsonDocumentStream.ParseJsonDocumentStream(reader))
{
    // Use the JsonDocument
    Console.WriteLine(jsonDocument.RootElement);

    // Deserialize the JsonDocument
    Console.WriteLine(jsonDocument.Deserialize<User>());

    jsonDocument.Dispose();
}

You need to install the System.IO.Pipelines package to use PipeReader:

Shell
dotnet add package System.IO.Pipelines
C#
class JsonDocumentStream
{
    public static IAsyncEnumerable<JsonDocument> ParseJsonDocumentStream(Stream stream, byte? separator = null)
    {
        var reader = PipeReader.Create(stream);
        return ParseJsonDocumentStream(reader, separator);
    }

    public static async IAsyncEnumerable<JsonDocument> ParseJsonDocumentStream(PipeReader reader, byte? separator = null)
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync().ConfigureAwait(false);
            ReadOnlySequence<byte> buffer = result.Buffer;

            while (!buffer.IsEmpty)
            {
                // Support custom separator
                // Note that for spaces, such as new lines, JsonDocument.TryParseValue will
                // ignore them. So, there is no need to trim the buffer.
                if (separator.HasValue && buffer.First.Span[0] == separator)
                {
                    buffer = buffer.Slice(1);
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }

                if (TryParseJson(ref buffer, out var jsonDocument))
                    yield return jsonDocument;

                // Cannot be inline as the method is async, so Utf8JsonReader cannot be instantiated (ref struct)
                static bool TryParseJson(ref ReadOnlySequence<byte> buffer, [NotNullWhen(true)] out JsonDocument? jsonDocument)
                {
                    var reader = new Utf8JsonReader(buffer, isFinalBlock: false, default);
                    if (JsonDocument.TryParseValue(ref reader, out jsonDocument))
                    {
                        buffer = buffer.Slice(reader.BytesConsumed);
                        return true;
                    }

                    return false;
                }
            }

            if (result.IsCompleted)
                break;

            // Remove the processed data from the buffer and make the remaining data visible again
            reader.AdvanceTo(buffer.Start, buffer.End);
        }

        reader.Complete();
    }
}

Do you have a question or a suggestion about this post? Contact me!

Follow me:
Enjoy this blog?