ECS Ingest Channelsedit

A specialization of Elastic.Ingest.Elasticsearch that offers two channel implementations that make it easy to write ECS formatted data and bootstrap the target datastreams/indices with ECS mappings and settings.

Installationedit

Add a reference to the Elastic.Ingest.Elasticsearch.CommonSchema package:

<PackageReference Include="Elastic.Ingest.Elasticsearch.CommonSchema" Version="8.6.0" />

Usageedit

EcsDataStreamChannel<TEvent>edit

A channel that specializes to writing data with a timestamp to Elasticsearch data streams.

A channel can be created to push data to the logs-dotnet-default data stream.

var dataStream = new DataStreamName("logs", "dotnet");
var bufferOptions = new BufferOptions { }
var options = new DataStreamChannelOptions<EcsDocument>(transport)
{
  DataStream = dataStream,
  BufferOptions = bufferOptions
};
var channel = new EcsDataStreamChannel<EcsDocument>(options);

Learn more about Elastic’s data stream naming convention in this blog post.

We can now push data to Elasticsearch using the EcsDataStreamChannel

var doc = new EcsDocument
{
    Timestamp = DateTimeOffset.Now,
    Message = "Hello World!",
}
channel.TryWrite(doc);

EcsIndexChannel<TEvent>edit

A channel that specializes in writing catalog data to Elastic indices.

We can create an EcsIndexChannel<> to push EcsDocument (or subclassed) instances.

var options = new IndexChannelOptions<EcsDocument>(transport)
{
    IndexFormat = "catalog-data-{0:yyyy.MM.dd}",
    // BulkOperationIdLookup = c => null,
    TimestampLookup = c => c.Timestamp,
};
var channel = new EcsIndexChannel<CatalogDocument>(options);

Now we can push data using:

var doc = new CatalogDocument
{
    Created = date,
    Title = "Hello World!",
    Id = "hello-world"
}
channel.TryWrite(doc);

This will push data to catalog-data-2023.01.1 because TimestampLookup yields Timestamp to IndexFormat.

IndexFormat can also simply be a fixed string to write to an Elasticsearch alias/index.

BulkOperationIdLookup determines if the document should be pushed to Elasticsearch using a create or index operation.

Bootstrapping target Datastream or Indexedit

Optionally the target data stream or index can be bootstrapped using the following.

await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default");

This will bootstrap:

If the index template already exists no further bootstrapping will occur.

Just like Elastic.Ingest.Elasticsearch the channel is aware that logs and metrics have default component templates and ensures the new index tempate references them.