Diagnostic Source
editDiagnostic Source
editElasticsearch.Net and NEST support capturing diagnostics information using DiagnosticSource
and Activity
from the
System.Diagnostics
namespace.
To aid with the discoverability of the topics you can subscribe to and the event names they emit,
both topics and event names are exposed as strongly typed strings under Elasticsearch.Net.Diagnostics.DiagnosticSources
Subscribing to DiagnosticSources means implementing IObserver<DiagnosticListener>
or using .Subscribe(observer, filter)
to opt in to the correct topic.
Here we choose the more verbose IObserver<>
implementation
public class ListenerObserver : IObserver<DiagnosticListener>, IDisposable { private long _messagesWrittenToConsole = 0; public long MessagesWrittenToConsole => _messagesWrittenToConsole; public Exception SeenException { get; private set; } public void OnError(Exception error) => SeenException = error; public bool Completed { get; private set; } public void OnCompleted() => Completed = true; private void WriteToConsole<T>(string eventName, T data) { var a = Activity.Current; Interlocked.Increment(ref _messagesWrittenToConsole); } private List<IDisposable> Disposables { get; } = new List<IDisposable>(); public void OnNext(DiagnosticListener value) { void TrySubscribe(string sourceName, Func<IObserver<KeyValuePair<string, object>>> listener) { if (value.Name != sourceName) return; var subscription = value.Subscribe(listener()); Disposables.Add(subscription); } TrySubscribe(DiagnosticSources.AuditTrailEvents.SourceName, () => new AuditDiagnosticObserver(v => WriteToConsole(v.Key, v.Value))); TrySubscribe(DiagnosticSources.Serializer.SourceName, () => new SerializerDiagnosticObserver(v => WriteToConsole(v.Key, v.Value))); TrySubscribe(DiagnosticSources.RequestPipeline.SourceName, () => new RequestPipelineDiagnosticObserver( v => WriteToConsole(v.Key, v.Value), v => WriteToConsole(v.Key, v.Value) )); TrySubscribe(DiagnosticSources.HttpConnection.SourceName, () => new HttpConnectionDiagnosticObserver( v => WriteToConsole(v.Key, v.Value), v => WriteToConsole(v.Key, v.Value) )); } public void Dispose() { foreach(var d in Disposables) d.Dispose(); } }
Thanks to DiagnosticSources
, you do not have to guess the topics emitted.
The DiagnosticListener.Subscribe
method expects an IObserver<KeyValuePair<string, object>>
which is a rather generic message contract. As a subscriber, it’s useful to know what object
is in each case.
To help with this, each topic within the client has a dedicated Observer
implementation that
takes an onNext
delegate typed to the context object actually emitted.
The RequestPipeline diagnostic source emits a different context objects the start and end of the Activity
For this reason, RequestPipelineDiagnosticObserver
accepts two onNext
delegates,
one for the .Start
events and one for the .Stop
events.
Subscribing to topics
editAs a concrete example of subscribing to topics, let’s hook into all diagnostic sources and use
ListenerObserver
to only listen to the ones from Elasticsearch.Net
using(var listenerObserver = new ListenerObserver()) using (var subscription = DiagnosticListener.AllListeners.Subscribe(listenerObserver)) { var pool = new SniffingConnectionPool(new []{ TestConnectionSettings.CreateUri() }); var connectionSettings = new ConnectionSettings(pool) .DefaultMappingFor<Project>(i => i .IndexName("project") ); var client = new ElasticClient(connectionSettings); var response = client.Search<Project>(s => s .MatchAll() ); listenerObserver.SeenException.Should().BeNull(); listenerObserver.Completed.Should().BeFalse(); listenerObserver.MessagesWrittenToConsole.Should().BeGreaterThan(0); }