Skip to main content

Описание работы Kafka

Общее описание

Kafka - это такой сетевой "лог-файл" в который можно писать сообщения и из которого их можно читать. Kafka часто используется как интеграционная шина, поэтому её можно сравшивать с RabbitMQ, и в первую очередь сравнить их нужно чтобы подчеркнуть различия.

RabbitMQ - это броккер сообщений. Он занимается передачей сообщений. В него можно послать сообщение, и оно, вероятно, будет доставлено адресату. После доставки сообщение удаляется из системы. Если при обработке сообщения возникла ошибка, сообщение может быть заново помещено в очередь.

В свою очередь кафка - это хранилище упорядоченных сообщений. У них нет адресата. Они не удаляются из системы если их кто-то прочитал. Подразумевается, что одно и то же сообщение может быть прочитано разными потребителями (consumer) с разными целями, и факт прочтения сообщения одним потребителем никак не влияет на других. Чтобы отслеживать какие сообщения уже были прочитаны потребителем, вводится понятие "смещение" (offset) - номер сообщения в последовательности.

Kafka хранит у себя текущее смещение для каждого известного потребителя, поэтому потребителю не нужно хранить его у себя, достаточно сделать запрос вида "дай следующее сообщение", а после "увеличь смещение на единицу".

Конкретное описание

Сообщения пишутся в топик (topic). Топик - это последовательность сообщений. У топика нет схемы, в него могут писать и из него могут читать кто угодно и что угодно. Процесс, который пишет называется продюсер (producer). Процесс который читает - консюмер (consumer). Внутри топик состоит из партиций (partition). Партиция это уже реальный файл на диске. Если вы хотите распределить нагрузку по нескольким серверам, кафки, то вам нужно настроить топик так, чтобы у него было больше одной партиции.

Партиции ещё интересны тем, что они задают максимальное количество параллельных потоков конкурентного чтения. Не параллельного, а именно конкурентного. Вы можете иметь сколько угодно консюмеров читающих из одного топика. Все они получат каждое сообщение. Но если вам нужно сделать так чтобы сообщение было гарантировано прочитано только одним консюмером из определённой группы, вам нужно создать столько партиций сколько параллельных потоков вы хотите сделать.

Чтобы настроить несколько консюмеров на конкурентное чтение сообщений, нужно задать им одинаковый ID консюмер-группы (consumer group). Если у консюмера задана консюмер-группа, то кафка начинает сохранять смещения не для конкретного консюмера, а для группы. Таким образом, когда первый консюмер прочитает сообщение, он увеличит смещение, и второй консюмер иэ той же группы получит уже следующее сообщение, а не то же самое.

Настройка сервиса для работы с кафкой

Мы используем расширения php для работы с кафкой phprdkafka. Оно уже установлено в базовом docker образе и ничего дополнительно делать не нужно. Для удобства есть три пакета для ларавеля, которые скрывают низкоуровневые операции, предоставляя более простой интерфейс для чтения и записи сообщений в топики.

ensi/laravel-phprdkafka

Этот пакет добавляет в систему менеджер подключений к кафке, по аналогии с менеджером подключений к БД. Вы можете описать в config/kafka.php параметры подключения, отдельно для консюмеров, отдельно для продюсеров, можно задать несколько подключений с разными именами. И в дальнейшем можно получать уже сконфигурированные объекты RdKafka\Producer и RdKafka\KafkaConsumer просто вызывая соотвествующие методы у сервиса KafkaManager, получив его из сервис-контейнера или же обращаясь к нему через фасад Kafka.

Пример конфигурации:

# config/kafka.php
return [
'consumers' => [
'default' => [
'metadata.broker.list' => env('KAFKA_BROKER_LIST'),
'security.protocol' => env('KAFKA_SECURITY_PROTOCOL', 'plaintext'),
'sasl.mechanisms' => env('KAFKA_SASL_MECHANISMS'),
'sasl.username' => env('KAFKA_SASL_USERNAME'),
'sasl.password' => env('KAFKA_SASL_PASSWORD'),
'log_level' => env('KAFKA_DEBUG', false) ? (string) LOG_DEBUG : (string) LOG_INFO,
'debug' => env('KAFKA_DEBUG', false) ? 'all' : null,

// consumer specific options
'group.id' => env('KAFKA_CONSUMER_GROUP_ID', env('APP_NAME')),
'enable.auto.commit' => true,
'auto.offset.reset' => 'beginning',
'allow.auto.create.topics' => true,
],
],
'producers' => [
// ...
]
];

Настраивая подключение вы можете использовать любые параметры rdkafka. В пакете уже установлены некоторые параметры по умолчанию:

  • group.id - идентификатор консюмер-группы. Он задаётся всегда, не важно планируете вы использовать один инстанс консюмера или несколько. Когда вам захочется отмасштабировать консюмеры, они уже будут в одной группе, останется только следить за тем чтобы хватало партиций.
  • enable.auto.commit - автоматически обновляет смещение через некоторое время после получения сообщения
  • auto.offset.reset - задаёт поведение в случае, когда для текущей консюмер-группы в кафке не сохранено смещение в каком-то топике. beginning означает что будут прочитаны все сообщения самого начала.
  • allow.auto.create.topics - разрешает консюмеру создать топик при обращении к нему

ensi/laravel-phprdkafka-consumer

Этот пакет добавляет в систему artisan команду kafka:consume topic и возможность настроить обработчик сообщения. Пример настройки:

# config/kafka-consumer.php
return [
'global_middleware' => [ TraceEventKafkaMiddleware::class ],

'processors' => [
[
'topic' => env('APP_ENV') . '.catalog.fact.offers.1',
'consumer' => 'default',
'type' => 'action',
'class' => \App\Domain\Kafka\Actions\Listen\ListenOfferAction::class,
'queue' => false,
'consume_timeout' => 5000,
],
]
];

Самое главное здесь - это то что вы указываете каким классом будут обрабатываться сообщения из конкретного топика, а так же какой консюмер будет использоваться для получения сообщений. Если вам для какого-то топика нужно задать другие настройки консюмера, то нужно создать другое подключение в config/kafka.php и сослаться на него.

Класс обработчик - это просто класс с методом action(RdKafka\Message $message), внутри которого вы описываете логику обработки сообщения.

class ListenOfferAction {
public function action (Message $message)
{
// ...
}
}

Кроме того вы можете задавать middleware, которые очень похожи на http middleware:

class TraceEventKafkaMiddleware {
public function handle(Message $message, Closure $next): mixed
{
// ...
return $next($message);
}
}

ensi/laravel-phprdkafka-producer

Этот пакет предоставляет обёртку над RdKafka\Producer, которая выполняет рутинные задачи.

$producer = new HighLevelProducer("my-topic", "my-producer");
$producer->sendOne($payload);

Под капотом HighLevelProducer получает объект продюсера, настройки которого описаны в config/kafka.php, создаёт топик, отправляет сообщение, выполняет flush чтобы сообщение гарантировано ушло из буфера в кафку.