Deriving metrics from logs
You can use OpenSearch Data Prepper to derive metrics from logs.
The following example pipeline receives incoming logs using the http source plugin and the grok processor. It then uses the aggregate processor to extract the metric bytes aggregated during a 30-second window and derives histograms from the results.
This pipeline writes data to two different OpenSearch indexes:
logs: This index stores the original, un-aggregated log events after being processed by thegrokprocessor.histogram_metrics: This index stores the derived histogram metrics extracted from the log events using theaggregateprocessor.
The pipeline contains two sub-pipelines:
-
apache-log-pipeline-with-metrics: Receives logs through an HTTP client like FluentBit, usinggrokto extract important values from the logs by matching the value in the log key against the Apache Common Log Format. It then forwards the grokked logs to two destinations: - An OpenSearch index named
logsto store the original log events. -
The
log-to-metrics-pipelinefor further aggregation and metric derivation. log-to-metrics-pipeline: Receives the grokked logs from theapache-log-pipeline-with-metricspipeline, aggregates the logs, and derives histogram metrics of bytes based on the values in theclientipandrequestkeys. Finally, it sends the derived histogram metrics to an OpenSearch index namedhistogram_metrics.
Example pipeline
apache-log-pipeline-with-metrics:
source:
http:
# Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline.
# In this case it would be "/apache-log-pipeline-with-metrics/logs". This will be the FluentBit output URI value.
path: "/${pipelineName}/logs"
processor:
- grok:
match:
log: [ "%{COMMONAPACHELOG_DATATYPED}" ]
sink:
- opensearch:
...
index: "logs"
- pipeline:
name: "log-to-metrics-pipeline"
log-to-metrics-pipeline:
source:
pipeline:
name: "apache-log-pipeline-with-metrics"
processor:
- aggregate:
# Specify the required identification keys
identification_keys: ["clientip", "request"]
action:
histogram:
# Specify the appropriate values for each of the following fields
key: "bytes"
record_minmax: true
units: "bytes"
buckets: [0, 25000000, 50000000, 75000000, 100000000]
# Pick the required aggregation period
group_duration: "30s"
sink:
- opensearch:
...
index: "histogram_metrics"