Um serviço de streaming de dados permite a integração entre sistemas em tempo real, onde uma aplicação pode reagir à um fluxo de dados gerado por outra aplicação. Um software robusto e confiável que viabiliza esse tipo de comunicação é o Apache Kafka.

kafka_diagram

Vamos pular a parte da implantação do software e das suas políticas de administração, e partir para as dicas de como conectar a sua aplicação a um Tópico no Kafka. No mundo .NET a biblioteca mais atualizada que encapsula essa comunicação é o Confluent Kafka.

Neste tutorial vamos considerar que você já possui uma aplicação .NET Core/Standard 2.0 e deseja implementar o padrão Publish/Subscribe. Primeiramente você deve importar os pacotes Nuget. Faça assim:

Install-Package Confluent.Kafka

Deveria funcionar certo? Há um problema de compatibilidade dessa biblioteca quando usamos a versão .NET Core 2.0 e você recebe a seguinte mensagem ao compilar o projeto:

warning NU1701: Package 'librdkafka.redist 0.11.0' was restored using 
'.NETFramework,Version=v4.6.1' 
instead of the project target framework '.NETCoreApp,Version=v2.0'. 
This package may not be fully compatible with your project.

Há inclusive uma issue no GitHub do Confluent Kafka já com uma solução é simples. Modifique o seu csproj da seguinte forma:

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <TargetFramework>netstandard2.0</TargetFramework>
  </PropertyGroup>
  <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
    <NoWarn>1701;1702;1705;NU1701</NoWarn>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="Confluent.Kafka" Version="0.11.0" NoWarn="NU1701" />
  </ItemGroup>
</Project>

A modificação relevante é uma instrução de compilação para ignorar o warning NU1701. Vamos agora aos detalhes da implementação do padrão Publish/Subscribe. Você precisa da string de conexão com o Kafka. Geralmente no formato “enderecoIPdoKafka:9092” e de um nome para o seu tópico que pode ser simples como “meutopico01”.

Adicione os namespaces:

using Confluent.Kafka;
using Confluent.Kafka.Serialization;

Para produzir mensagens utilize a classe Producer<X,Y> onde X e Y são tipos representando as estruturas das chaves e valores de um tópico no Kafka. Você pode construir seu produtor assim:

Producer<string, string> _producer = new Producer<string, string>(
    new Dictionary<string, object>()
    {{ "bootstrap.servers", "enderecoIPdoKafka:9092" }}, 
    new StringSerializer(Encoding.UTF8), 
    new StringSerializer(Encoding.UTF8));

As linhas anteriores criam um produtor de mensagens, possibilitando executar uma publicação de mensagens com as próximas linhas:

Message<string, string> message = await _producer.ProduceAsync(
    "meutopico01", "minhachave00", "meus dados");

Do lado do Consumer o código é semelhante. Você constrói o Consumer com o código abaixo:

Consumer<string, string> _consumer = new Consumer<string, string>(
    new Dictionary<string, object>() {
        { "group.id", "consumer" },
        {"bootstrap.servers", "enderecoIPdoKafka:9092" }},
        new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8));
    }

E para ler mensagens eu tenho utilizado um loop como este:

_consumer.Assign(new List
{
    new TopicPartitionOffset("meutopico01", 0, 0)
});

while (true)
{
    Message<string, string> msg;

    if (_consumer.Consume(out msg, TimeSpan.FromSeconds(1)))
    {
        Console.WriteLine($"Topic: {msg.Topic}");
        Console.WriteLine($"Key: {msg.Key}");
        Console.WriteLine($"Value: {msg.Value}");
    }
}

Se você deseja ver uma implementação real do uso do Apache Kafka com .NET Core 2.0 recomendo que acesso o projeto Jambo no GitHub.