大家好,我是深山踏红叶,今天来聊一聊
安装
首先, 老规矩,先安装依赖包
dotnet add package Confluent.Kafka
生产者
在 Program.cs
中配置 Kafka 生产者,使其作为服务注入到依赖注入容器中。这样可以在应用的任何地方轻松地使用 Kafka 生产者。
using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
var builder = WebApplication.CreateBuilder(args);
// 配置 Kafka 生产者
builder.Services.AddSingleton<IProducer<, string>>(provider =>
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
Acks = Acks.All,
//DeliveryReportOnlyError = true, //net分享
MessageSendMaxRetries = 3
};
returnnew ProducerBuilder<, string>(config).Build();
});
// 注册 Kafka 生产者
builder.Services.AddSingleton();
var app = builder.Build();
创建一个服务类来封装 Kafka 消息生产者。
using Confluent.Kafka;
using System.Threading.Tasks;
publicclassKafkaProducerService
{
privatereadonly IProducer<, string> _producer;
public KafkaProducerService(IProducer<, string> producer)
{
_producer = producer;
}
public async Task ProduceMessageAsync(string topic, string message)
{
try
{
var result = await _producer.ProduceAsync(topic, new Message<, string> { Value = message });
Console.WriteLine($"Message '{result.Value}' delivered to {result.TopicPartitionOffset}");
}
catch (ProduceException<, string> e)
{
Console.WriteLine($"Error producing message: {e.Error.Reason}");
}
}
}
使用它,在需要的地方注入 KafkaProducerService
并使用它来发送消息。
using Microsoft.AspNetCore.Mvc;
using System.Threading.Tasks;
[ApiController]
[Route("api/[controller]")]
publicclassKafkaController : ControllerBase
{
privatereadonly KafkaProducerService _producerService;
public KafkaController(KafkaProducerService producerService)
{
_producerService = producerService;
}
[HttpPost]
public async Task ProduceMessage([FromBody] string message)
{
await _producerService.ProduceMessageAsync("test-topic", message);
return Ok("Message sent to Kafka");
}
}
消费者
Kafka 消费者可以作为单独的后台服务运行,以便异步消费来自 Kafka 的消息。
ASP.NET Core 中,可以使用 IHostedService
来实现后台消费者服务。
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using System;
using System.Threading;
using System.Threading.Tasks;
publicclassKafkaConsumerService : IHostedService
{
privatereadonly IConsumer<Ignore, string> _consumer;
public KafkaConsumerService()
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "test-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
StatisticsIntervalMs = 5000
};
_consumer = new ConsumerBuilder<Ignore, string>(config).Build();
}
public Task StartAsync(CancellationToken cancellationToken)
{
// 启动消费者线程
Task.Run(() => ConsumeMessages(cancellationToken), cancellationToken);
return Task.CompletedTask;
}
private void ConsumeMessages(CancellationToken cancellationToken)
{
_consumer.Subscribe("test-topic");
while (!cancellationToken.IsCancellationRequested)
{
try
{
var consumeResult = _consumer.Consume(cancellationToken);
Console.WriteLine($"Consumed message: {consumeResult.Message.Value} at {consumeResult.TopicPartitionOffset}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error: {e.Error.Reason}");
}
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_consumer.Close();
return Task.CompletedTask;
}
}
在 Program.cs
中将消费者服务注册为后台服务:
var builder = WebApplication.CreateBuilder(args);
// 注册 Kafka 生产者和消费者服务
builder.Services.AddSingleton<IProducer<, string>>(provider =>
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
returnnew ProducerBuilder<, string>(config).Build();
});
builder.Services.AddHostedService(); // 注册消费者为后台服务
var app = builder.Build();
// 中间件等其他配置...
app.Run();
错误处理和重试机制
在生产和消费消息时,错误是不可避免的。你可以根据需要增加重试逻辑或使用回调处理失败。可以通过配置 Kafka 消费者的 AutoOffsetReset
和 EnableAutoCommit
等选项来控制消费者行为。
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "test-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest, // 从最早的消息开始消费
EnableAutoCommit = false // 手动提交偏移量
};
提高性能
o 批量发送消息:使用 ProduceAsync
方法的批量发送功能。o 异步消费:确保消费者在处理消息时不会阻塞主线程。 o 分区策略:合理分配消息到不同的分区,以提高并发处理能力。
总结
通过上面的操作,你可以在 ASP.NET Core 应用中实现 Kafka 生产者和消费者的功能。生产者可以将消息发送到 Kafka,消费者则可以从 Kafka 中异步消费消息。还可以通过优化和扩展该系统,如使用序列化、分区策略、消费者组协调等。
进一步阅读
o Kafka 官方文档 o Confluent.Kafka GitHub