Using Grafana Agent to proxy AWS Kinesis Data Firehose in to Grafana Loki.

Running workloads on AWS EKS with EC2 instances is straightforward and something that you just do with out thinking much about it. Now if you switch to use Fargate, then there is a whole set of other things you need to think about and skills to pick up.

One of those things is logging. So I’ve been experimenting with Kinesis Data Firehose and Grafana Agent, to get a flow of logs in to Grafana Loki. Which is what this post will be all about.

Getting Grafana Agent up and running

I started off by using Grafanas own instructions for how to deploy agent in flow mode.

But running helm install won’t get you far. So lets take a look at the values I used.

The values

Agent will take a stream of batched logs from Firehose so we will use a standard deployment instead of daemonset.

controller:
  type: deployment

The awsfirehose receiver will be setup to listen on a specific port so we will add that as an extra port to deployment.

agent:
  extraPorts:
   - name: "faro"
     port: 12347
     targetPort: 12347
     protocol: "TCP"

name: “faro” is what is defined as port name in the ingress. So we use the same here.

Next is ingress. Here I used aws-load-balancer-controller annotations to get the functionallity I needed.

ingress:
  annotations:
    alb.ingress.kubernetes.io/backend-protocol: "HTTP"
    alb.ingress.kubernetes.io/certificate-arn: "arn:aws:acm:<region>:<account_id>:certificate/<certificate_id>"
    alb.ingress.kubernetes.io/healthcheck-port: "80"
    alb.ingress.kubernetes.io/listen-ports: '[{"HTTPS": 443}]'
    alb.ingress.kubernetes.io/scheme: "internet-facing"
    alb.ingress.kubernetes.io/ssl-redirect: "443"
    alb.ingress.kubernetes.io/target-type: "ip"
    alb.ingress.kubernetes.io/actions.grafana-agent: >
      {"type":"forward","forwardConfig":{"targetGroups":[{"serviceName":"grafana-agent","servicePort":"12347","weight":1}]}}      
    alb.ingress.kubernetes.io/conditions.grafana-agent: >
      [{"field":"http-header","httpHeaderConfig":{"httpHeaderName": "X-Amz-Firehose-Access-Key", "values":["a-really-long-and-scret-header-value"]}}]      
    external-dns.alpha.kubernetes.io/hostname: grafana-agent.example.domain.com
  enabled: true
  hosts:
  - grafana-agent.example.domain.com
  path: /*
  pathType: ImplementationSpecific
  ingressClassName: "alb"

The most noticeable configurations are the alb.ingress.kubernetes.io/actions.agent-grafana-agent and alb.ingress.kubernetes.io/conditions.agent-grafana-agent annotations.

They will setup forward rules to the correct kubernetes service name, but also require that a specific header (X-Amz-Firehose-Access-Key) is present on each request. Requests without it won’t be forwarded.

Other security measuers could be taken, but this was the easiest one to implement. The header, X-Amz-Firehose-Access-Key, is already something that Kinesis Data Firehose uses when communicating with other third parties. I also tried to use Security Group and defining CIDR ranges, but those did not seem to work when using an http-endpoint configuration in Firehose.

Final piece is the configmap with the Grafana Agent river configuration.

agent:
  configMap:
    content: |
      loki.source.awsfirehose "loki_fh_receiver" {
        http {
          listen_address = "0.0.0.0"
          listen_port    = 12347
        }
        forward_to = [
          loki.process.process_firehose.receiver,
        ]
      }
      loki.process "process_firehose" {
        forward_to = [
          loki.write.local.receiver,
        ]
        stage.json {
          expressions = {log = "", kubernetes = ""}
        }
        stage.json {
          source      = "kubernetes"
          expressions = {pod = "pod_name", namespace = "namespace_name", container = "container_name", labels = ""}
        }
        stage.static_labels {
          values = {
            cluster = "my-cluster",
            environment = "testing",
            region = "<region>",
          }
        }
        stage.labels {
          values = {
            pod = "",
            namespace = "",
            container = "",
            app = "labels.\"app.kubernetes.io/name\"",
            instance = "labels.\"app.kubernetes.io/instance\"",
            component = "labels.\"app.kubernetes.io/component\"",
          }
        }
        stage.output {
          source = "log"
        }
      }
      loki.write "local" {
        endpoint {
          url       = "http://loki-write.loki.svc.cluster.local:3100/loki/api/v1/push"
          tenant_id = "meta-monitoring"
        }
      }      

Breakdown of river configuration

loki.source.awsfirehose "loki_fh_receiver" {}

This setup a listener for awsfirehose with a port and then forward to the loki.process step.

loki.process "process_firehose" {}

The process step will use json stage to be able to parse the batched logs coming in.

Next json stage will parse the kubernetes block of the Firehose message.

Then add some static labels. And then define then dynamic labels and from where the values are coming from. The most difficult part was figuring out how to get dot and / notated keys from the labels which come from stage.json {source="kubernetes"}.

Final stage is to output which part of the original Firehose message that contained the actual log message.

All processing is now done and loki.write "local" {} will push the logs to Loki.

With Grafana Agent deployed we will take a look at the Firehose setup.

Firehose and FluentBit

Setup of Firehose and getting data in to it is fairly simple. Use your prefered IaC tool to setup the needed resources.

I used terraform to create the needed resources.

resource "aws_s3_bucket" "this" {
  bucket = "firehose"
}

resource "aws_kinesis_firehose_delivery_stream" "this" {
  name        = "grafana-agent"
  destination = "http_endpoint"

  http_endpoint_configuration {
    url                = "https://grafana-agent.ingress.fqdn"
    name               = "http-endpoint"
    role_arn           = aws_iam_role.this.arn
    s3_backup_mode     = "FailedDataOnly"
    access_key         = "a-really-long-and-scret-header-value"

    s3_configuration {
      role_arn           = aws_iam_role.this.arn
      bucket_arn         = aws_s3_bucket.this.arn
    }

    request_configuration {
      content_encoding = "NONE"
    }
  }
}

resource "aws_iam_role" "this" {
  name        = "grafana-agent"

  assume_role_policy = <<POLICY
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Effect": "Allow"
    }
  ]
}
POLICY

}

data "aws_iam_policy_document" "this" {
  statement {
    actions = [
      "s3:AbortMultipartUpload",
      "s3:GetBucketLocation",
      "s3:GetObject",
      "s3:ListBucket",
      "s3:ListBucketMultipartUploads",
      "s3:PutObject",
    ]

    resources = [
      aws_s3_bucket.this.arn,
      "${aws_s3_bucket.this.arn}/*",
    ]

    effect = "Allow"
  }
}

Last step is to enable logging for Fargate pods in your EKS cluster.

Deploy this configmap to configure FluentBit on Fargate.

apiVersion: v1
kind: ConfigMap
metadata:
  name: aws-logging
  namespace: aws-observability
data:
  flb_log_cw: "false"
  filters.conf: |
    [FILTER]
        Name                kubernetes
        Match               kube.*
        Merge_Log           On
        Buffer_Size         0
        Kube_Meta_Cache_TTL 300s    
  output.conf: |
    [OUTPUT]
        Name kinesis_firehose
        Match *
        region eu-west-1
        delivery_stream grafana-agent    

All done and you should see logs flowing in to Loki.

Two things that I noticed but never looked at was: clean up of log message and log timestamps.

Logs are batched through Firehose and Grafana Agent. So the timestamp that ends up in Loki will differ from the actual timestamp in each log line. So that could be something to look at and see what changes to Grafana Agent configuration can be made to make it more correct.

References

Documentation from Amazon on Fargate logging.

Repo for aws-for-fluent-bit.

Grafana Agent component reference.