Scrolling documents

edit

The scroll API can be used to return a large collection of documents from Elasticsearch.

NEST exposes the scroll API and an observable scroll implementation that can be used to write concurrent scroll requests.

Simple use

edit

The simplest use of the scroll API is to perform a search request with a scroll timeout, then pass the scroll id returned in each response to the next request to the scroll API, until no more documents are returned

var searchResponse = Client.Search<Project>(s => s
    .Query(q => q
        .Term(f => f.State, StateOfBeing.Stable)
    )
    .Scroll("10s") 
);

while (searchResponse.Documents.Any()) 
{
    ProcessResponse(searchResponse); 
    searchResponse = Client.Scroll<Project>("10s", searchResponse.ScrollId);
}

Specify a scroll time for how long Elasticsearch should keep this scroll open on the server side. The time specified should be sufficient to process the response on the client side.

make subsequent requests to the scroll API to keep fetching documents, whilst documents are returned

do something with the response

ScrollAllObservable

edit

Similar to BulkAllObservable for bulk indexing a large number of documents, NEST exposes an observable scroll implementation, ScrollAllObservable, that can be used to write concurrent scroll requests. ScrollAllObservable uses sliced scrolls to split the scroll into multiple slices that can be consumed concurrently.

The simplest use of ScrollAllObservable is

int numberOfSlices = Environment.ProcessorCount; 

var scrollAllObservable = Client.ScrollAll<Project>("10s", numberOfSlices, sc => sc
    .MaxDegreeOfParallelism(numberOfSlices) 
    .Search(s => s
        .Query(q => q
            .Term(f => f.State, StateOfBeing.Stable)
        )
    )
);

scrollAllObservable.Wait(TimeSpan.FromMinutes(10), response => 
{
    ProcessResponse(response.SearchResponse); 
});

See sliced scroll documentation for choosing an appropriate number of slices.

Number of concurrent sliced scroll requests. Usually want to set this to the same value as the number of slices

Total overall time for scrolling all documents. Ensure this is a sufficient value to scroll all documents

do something with the response

More control over how the observable is consumed can be achieved by writing your own observer and subscribing to the observable, which will initiate scrolling

int numberOfSlices = Environment.ProcessorCount;

var scrollAllObservable = Client.ScrollAll<Project>("10s", numberOfSlices, sc => sc
    .MaxDegreeOfParallelism(numberOfSlices)
    .Search(s => s
        .Query(q => q
            .Term(f => f.State, StateOfBeing.Stable)
        )
    )
);

var waitHandle = new ManualResetEvent(false);
ExceptionDispatchInfo info = null;

var scrollAllObserver = new ScrollAllObserver<Project>(
    onNext: response => ProcessResponse(response.SearchResponse), 
    onError: e =>
    {
        info = ExceptionDispatchInfo.Capture(e); 
        waitHandle.Set();
    },
    onCompleted: () => waitHandle.Set()
);

scrollAllObservable.Subscribe(scrollAllObserver); 

waitHandle.WaitOne(); 
info?.Throw(); 

do something with the response

if an exception is thrown, capture it to throw outside of the observer

initiate scrolling

block the current thread until the wait handle is set

if an exception was captured whilst scrolling, throw it