Scrolling documents
editScrolling documents
editThe scroll API can be used to return a large collection of documents from Elasticsearch.
NEST exposes the scroll API and an observable scroll implementation that can be used to write concurrent scroll requests.
Simple use
editThe simplest use of the scroll API is to perform a search request with a scroll timeout, then pass the scroll id returned in each response to the next request to the scroll API, until no more documents are returned
var searchResponse = Client.Search<Project>(s => s .Query(q => q .Term(f => f.State, StateOfBeing.Stable) ) .Scroll("10s") ); while (searchResponse.Documents.Any()) { ProcessResponse(searchResponse); searchResponse = Client.Scroll<Project>("10s", searchResponse.ScrollId); }
Specify a scroll time for how long Elasticsearch should keep this scroll open on the server side. The time specified should be sufficient to process the response on the client side. |
|
make subsequent requests to the scroll API to keep fetching documents, whilst documents are returned |
|
do something with the response |
ScrollAllObservable
editSimilar to BulkAllObservable
for bulk indexing a large number of documents,
NEST exposes an observable scroll implementation, ScrollAllObservable
, that can be used
to write concurrent scroll requests. ScrollAllObservable
uses sliced scrolls to split the scroll into
multiple slices that can be consumed concurrently.
The simplest use of ScrollAllObservable
is
int numberOfSlices = Environment.ProcessorCount; var scrollAllObservable = Client.ScrollAll<Project>("10s", numberOfSlices, sc => sc .MaxDegreeOfParallelism(numberOfSlices) .Search(s => s .Query(q => q .Term(f => f.State, StateOfBeing.Stable) ) ) ); scrollAllObservable.Wait(TimeSpan.FromMinutes(10), response => { ProcessResponse(response.SearchResponse); });
See sliced scroll documentation for choosing an appropriate number of slices. |
|
Number of concurrent sliced scroll requests. Usually want to set this to the same value as the number of slices |
|
Total overall time for scrolling all documents. Ensure this is a sufficient value to scroll all documents |
|
do something with the response |
More control over how the observable is consumed can be achieved by writing your own observer and subscribing to the observable, which will initiate scrolling
int numberOfSlices = Environment.ProcessorCount; var scrollAllObservable = Client.ScrollAll<Project>("10s", numberOfSlices, sc => sc .MaxDegreeOfParallelism(numberOfSlices) .Search(s => s .Query(q => q .Term(f => f.State, StateOfBeing.Stable) ) ) ); var waitHandle = new ManualResetEvent(false); ExceptionDispatchInfo info = null; var scrollAllObserver = new ScrollAllObserver<Project>( onNext: response => ProcessResponse(response.SearchResponse), onError: e => { info = ExceptionDispatchInfo.Capture(e); waitHandle.Set(); }, onCompleted: () => waitHandle.Set() ); scrollAllObservable.Subscribe(scrollAllObserver); waitHandle.WaitOne(); info?.Throw();