Kafka что это за программа

Содержание

Kafka что это за программа

флаг —strip 1 просит архиватор не создавать родительскую папку типа ~/kafka/kafka_2.13-2.6.0, а распаковывать содержимое сразю в ~/kafka.

4 Настройка сервера Kafka

Поведение Kafka по умолчанию не позволяет нам удалить тему, категорию, группу или название канала, в котором можно публиковать сообщения. Чтобы исправить это, давайте отредактируем файл конфигурации:

nano ~/kafka/config/server.properties

и добавим это в конец:

delete.topic.enable = true

5 Создание Systemd Unit — файлов и запуск сервера Kafka

Создаем файл для зукипера (необходим для работы Kafka)

sudo nano /etc/systemd/system/zookeeper.service

[Unit]

Requires=network.target remote-fs.target

After=network.target remote-fs.target

[Service]

Type=simple

User=kafka

ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties

Про Kafka (основы)


ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh

Restart=on-abnormal

[Install]

WantedBy=multi-user.target

секция Unit требует чтобы перед запуском zookeeper сеть и файловая система уже были готовы

секция Service назначает используемые при старте и остановке zookeeper скрипты

Теперь создаем то же самое для Kafka:

sudo nano /etc/systemd/system/kafka.service

[Unit]

Requires=zookeeper.service

After=zookeeper.service

[Service]

Type=simple

User=kafka

ExecStart=/bin/sh -c ‘/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>TutorialTopic».

7.2 Создайте продюссера и пошлите сообщение:

echo «Hello, World» | ~/kafka/bin/kafka-console-producer.sh —broker-list localhost:9092 —topic TutorialTopic > /dev/null

7.3 Создайте консьюмера и запустите приём сообщений:

~/kafka/bin/kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic TutorialTopic —from-beginning

флаг —from-beginning даст возможность получить сообщения отправленные до запуска консьюмера. Вывод должен быть Hello, World.

Этот скрипт будет выполняться и принимать сообщения в реальном времени. Можете подключиться через другой терминал и попробовать отправить ещё что-нибудь в этот топик.

Также рекомендую скачать и установить клёвый десктопный клиент https://www.conduktor.io/ и законнектиться к серверу Kafka снаружи.

II. Создание Java SpringBoot проекта

Источником инфотмации послужила статья https://habr.com/ru/post/496182/, поэтому если что непонятно — вэлком туда.

Для начала создадим новый SpringBoot проект с зависимостями: Spring Web и Spring for Apache Kafka.

Мы не будем юзать Spring Web, он тут нужен только для того чтобы подтянулся Jackson. Либо можно не включать его в зависимости, а в файле pom.xml, прописать ручками

Что такое Apache Kafka и зачем это нужно


com.fasterxml.jackson.core jackson-databind

Создадим класс который будет слать сообщения каждые 3 секунды:

SuppressWarnings здесь потому что идея упорно говорит что такого бина не существует. Но он есть.

Теперь создадим класс консьюмера:

метод messageListener будет вызываться как поступит новое сообщение.

Група консьюмеров — это группа в рамках которой доставляется один экземпляр сообщения. Например, у Вас есть три консьюмера в одной группе, и все они слушают одну тему. Как только на сервере появляется новое сообщение с данной темой, оно доставляется кому-то одному из группы. Остальные два консьюмера сообщение не получают.

Ну и напоследок отредактируем файл application.properties:

#адрес сервера Kafka spring.kafka.bootstrap-servers=192.168.0.239:9092

Запускаем и тестируем! Можно юзать, например, Postman, либо встроенный в идею инструмент Tools > HTTP Client > Test RESTful Web Service.

Эксперименты

Продюссер при отправке сообщения может вернуть ответ об успешности или ошибке. Настрою коллбэк и буду выводить в консоль результат отправки сообщения. Дополним вызов kafkaTemplate.send:

ListenableFuture> future = kafkaTemplate.send(«msg», «currentTime», (new Date()).toString()); future.addCallback(System.out::println, System.err::println);

Теперь попробую перезагружать сервер Kafka. Что из этого вышло:

Если приложение запускается при недоступном сервере Kafka, То возникает исключение org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic msg not present in metadata after 60000 ms.

Если сервер Kafka становится недоступным в процессе работы, то ошибки обрабатываются в возвращаемом future объекте метода kafkaTemplate.send.

Пока Kafka недоступен, Spring копит отправленные но недоставленные сообщения внутни себя

  1. Если сервер недоступен менее 2-х минут, то как только Kafka станет доступен, спринг выплюнет все сообщения туда прикрепив к ним правильное время отправки.
  2. Если сервер недоступен более 2-х минут, то вызовется коллбэк с ошибкой доставки. Типа такого org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 40 record(s) for msg-0:120000 ms has passed since batch creation и такого org.apache.kafka.common.errors.TimeoutException: Expiring 40 record(s) for msg-0:120000 ms has passed since batch creation. Причем пачкой для нескольких сообщений. В итоге получится такой цикл: накопление сообщений — пачка ошибок — накопление сообщений — следующая пачка ошибок. Не разбирался по какому принципу, но как-то так. При появлении сервера Kafka, накопленные будут отправлены, а те, что с ошибками, — потерялись навсегда.

Отправка и получение кастомных объектов

Сначала реализуем продюссер. Для отправки объектов будем использовать сериализацию в Json. Для каждого кастомного класса нам нужен свой KafkaTemplate. Прежде чем приступить к их созданию, сделаем пару кастомных классов: UserDTO и UserDTO.Address:

package ru.knastnt.kafkatest; public class UserDTO < private String name; private int age; private Address address; public static UserDTO getTestInstance()< UserDTO u = new UserDTO(); UserDTO.Address a = new UserDTO.Address(); a.setStreet(«Ленина»); a.setHouse(16); u.setName(«Иван»); u.setAge(25); u.setAddress(a); return u; >public String getName() < return name; >public void setName(String name) < this.name = name; >public int getAge() < return age; >public void setAge(int age) < this.age = age; >public Address getAddress() < return address; >public void setAddress(Address address) < this.address = address; >public static class Address < private String street; private int house; public String getStreet() < return street; >public void setStreet(String street) < this.street = street; >public int getHouse() < return house; >public void setHouse(int house) < this.house = house; >> >

Теперь настроим кастомные шаблоны. Всё поместим в отдельный класс KafkaProducerConfig:

Основное что мы тут делаем — назначаем сериализаторы для ключа и значения сообщений.

А внутрь цикла в методе run, допишем действия по отправке наших кастомных объектов:

//Шлём объекты через кастомные шаблоны sleep(2000); ListenableFuture> userFuture = kafkaUserTemplate.send(«msg2», «user», UserDTO.getTestInstance()); userFuture.addCallback(System.out::println, System.err::println); sleep(2000); ListenableFuture> userFuture2 = kafkaAddressTemplate.send(«msg3», «addr», UserDTO.getTestInstance().getAddress()); userFuture2.addCallback(System.out::println, System.err::println);

Предлагаю запуститься и посмотреть как объекты улетают в кафку (это будет видно в консоли). К тому же можете воспользоваться https://www.conduktor.io/ и убедиться что топики пополняются новыми сообщениями.

Читайте также:
Обожаю что это за программа

Сделаем теперь несколько консьюмеров чтобы получать эти сообщения из кафки. Это несколько сложнее, но принцип тот же.

Создаем класс KafkaConsumerConfig:

Собственно для каждого кастомного класса нужно сделать по параметризованному ConcurrentKafkaListenerContainerFactory — это спринговая потокобезопасная обёртка.

В каждую из них нужно заинджектить конфигурацию консьюмера. Я реализовал её в виде параметризованного метода, в котором ключ — строка, а значение — объект передаваемого класса в json формате.

Теперь добавим пару слушающих методов в наш класс Listener:

Источник: knasys.ru

Запуск Kafka Server

И если вы видите следующий текст на консоли, значит, он работает.

2018-06-10 06:38:44,477] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2018-06-10 06:38:44,478] INFO [KafkaServer started (kafka.server.KafkaServer)

Создать темы

Сообщения публикуются в темах. Используйте эту команду для создания новой темы.

➜ kafka_2.11-1.1.0 bin/kafka-topics.sh —create —zookeeper localhost:2181 —replication-factor 1 —partitions 1 —topic test
Created topic «test».

Вы также можете получить список всех доступных тем, выполнив следующую команду.

➜ kafka_2.11-1.1.0 bin/kafka-topics.sh —list —zookeeper localhost:2181
test

Как видите, он печатает, test ,

Отправка сообщений

Далее мы должны отправлять сообщения,производителииспользуются для этой цели. Давайте инициировать продюсера.

➜ kafka_2.11-1.1.0 bin/kafka-console-producer.sh —broker-list localhost:9092 —topic test
>Hello
>World

Вы запускаете консольный интерфейс производителя, который работает на порту 9092 по умолчанию. —topic позволяет установить тему, в которой будут публиковаться сообщения. В нашем случае тема test

Это показывает вам > подскажите, и вы можете ввести все, что вы хотите.

Сообщения хранятся локально на вашем диске. Вы можете узнать о его пути, проверив значение log.dirs в config/server.properties файл. По умолчанию они установлены на /tmp/kafka-logs/

Если вы перечислите эту папку, вы найдете папку с именем test-0 , После перечисления вы найдете 3 файла: 00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex

Если вы откроете 00000000000000000000.log в редакторе он показывает что-то вроде:

Похоже, что закодированные данные или разделитель разделены, я не уверен. Если кто-то знает этот формат, то дайте мне знать.

В любом случае, Кафка предоставляет утилиту, которая позволяет вам проверять каждое входящее сообщение.

➜ kafka_2.11-1.1.0 bin/kafka-run-class.sh kafka.tools.DumpLogSegments —deep-iteration —print-data-log —files /tmp/kafka-logs/test-0/00000000000000000000.log
Dumping /tmp/kafka-logs/test-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 CreateTime: 1528595323503 isvalid: true keysize: -1 valuesize: 5 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: Hello
offset: 1 position: 73 CreateTime: 1528595324799 isvalid: true keysize: -1 valuesize: 5 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: World

Вы можете увидеть сообщение с другими деталями, такими как offset , position а также CreateTime и т.п.

Использование сообщений

Сохраненные сообщения также должны быть использованы. Начнем с консольного потребителя.

➜ kafka_2.11-1.1.0 bin/kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test —from-beginning

Если вы запустите, он будет сбрасывать все сообщения с начала до настоящего времени. Если вам просто интересно использовать сообщения после запуска потребителя, вы можете просто пропустить —from-beginning переключи его и беги. Причина в том, что старые сообщения не отображаются, потому что смещение обновляется, как только потребитель отправляет ACK брокеру Kafka для обработки сообщений. Вы можете увидеть рабочий процесс ниже.

Доступ к Кафке в Python

Для использования доступно несколько библиотек Python:

  • Кафка-Python — Библиотека сообщества с открытым исходным кодом.
  • PyKafka — Эта библиотека поддерживается Parsly и считается, что она является Pythonic API. В отличие от Kafka-Python, вы не можете создавать динамические темы.
  • Сливной питон кафка: — предлагается Confluent в качестве тонкой обертки вокруг librdkafka следовательно, это производительность лучше, чем два.

Для этого поста мы будем использовать открытый исходный кодКафка-Python,

Система оповещения рецептов в Кафке

Напоследок после о Elasticsearch, я очистил данные Allrecipes. В этом посте я собираюсь использовать тот же скребок, что и источник данных. Система, которую мы собираемся создать, представляет собой систему оповещения, которая будет отправлять уведомления о рецептах, если она соответствует определенному порогу калорий. Там будет две темы:

  • raw_recipes: — Он будет хранить сырой HTML каждого рецепта. Идея состоит в том, чтобы использовать эту тему в качестве основного источника наших данных, которые впоследствии могут быть обработаны и преобразованы в соответствии с потребностями.
  • parsed_recipes: -Как следует из названия, это будет проанализированный данные каждого рецепта в формате JSON.

Длина названия темы Кафки не должна превышать 249,

Типичный рабочий процесс будет выглядеть ниже:

устанавливать kafka-python с помощью pip

pip install kafka-python

Сырой рецепт производителя

Первая программа, которую мы собираемся написать, — продюсер. Он получит доступ к Allrecpies.com и получит сырой HTML и сохранит вraw_recipesтема.

Парсер рецептов

Следующий скрипт, который мы собираемся написать, будет служить как потребителем, так и производителем. Сначала он будет использовать данные из raw_recipes разделить и преобразовать данные в формат JSON, а затем опубликовать их в parsed_recipes тема. Ниже приведен код, который будет извлекать данные HTML из raw_recipes тема, разобрать, а затем кормить в parsed_recipes тема.

Все идет нормально. Мы сохранили рецепты как в формате raw, так и в формате JSON для дальнейшего использования. Далее мы должны написать потребителя, который будет связываться с parsed_recipes тема и генерировать оповещение, если определенно calories critera встречается.

JSON декодируется, а затем проверяется количество калорий, после того, как критерии соответствуют, выдается уведомление.

Вывод

Kafka — это масштабируемая, отказоустойчивая система обмена сообщениями с публикацией и подпиской, которая позволяет создавать распределенные приложения. Из-за его высокая производительность и эффективность она становится популярной среди компаний, которые производят большое количество данных из различных внешних источников и хотят получать данные в режиме реального времени. Я только что раскрыл суть этого. Изучите документы и существующую реализацию, и это поможет вам понять, как она лучше всего подходит для вашей следующей системы.

Эта статья первоначально опубликована Вот ,

щелчок Вот подписаться на мою рассылку для будущих сообщений.

Источник: machinelearningmastery.ru

Использовать Apache Kafka для гарантированного упорядочивания сообщений

Apache Kafka — это программное обеспечение для обмена сообщениями, используемое для передачи данных и информации между системами.

Учитывая возможности Kafka, он может поддерживать широкий спектр сценариев использования. В LinkedIn он был создан для сбора метрик об использовании приложений.

Основы Kafka

Очередь сообщений позволяет многим разрозненным приложениям взаимодействовать, посылая друг другу сообщения. Очередь — это очередь объектов, ожидающих обработки — в последовательном порядке, начиная с начала очереди. Приложения, которые читают данные из очереди сообщений, называются «потребителями». Приложения, которые отправляют сообщения в Kafka, называются «производителями».

В платформе данных сообщения могут быть данными, а потребители — программами обработки данных. Эти программы обработки могут быть ориентированы на пакетную обработку, а могут быть программами потоковой обработки в режиме реального времени.

kafka message queue

Почему именно Kafka?

  • Kafka обладает высокой масштабируемостью. Очень легко добавить большое количество потребителей без ущерба для производительности и надежности. Это потому, что Kafka не отслеживает, какие сообщения в теме были потреблены потребителями. Она просто хранит все сообщения в теме в течение настраиваемого периода времени.
  • Kafka также поддерживает различные модели потребления. Вы можете иметь одного потребителя, обрабатывающего сообщения в режиме реального времени, и другого потребителя, обрабатывающего сообщения в пакетном режиме в полностью разделенном режиме.
  • Kafka поддерживает возможность получения сообщений от широкого круга производителей.
  • Долговечность сообщений в Kafka высокая. Он сохраняет сообщения/события в течение заданного периода времени.
  • Kafka линейно масштабируется при больших объемах данных. Добавление дополнительных брокеров/кластеров увеличит пропускную способность или уменьшит задержку.
  • Kafka поддерживает отличную интеграцию с другими системами обработки данных. К ним относятся Apache Storm, Spark, NiFi, Flume и т.д. для завершения работы.
Читайте также:
Для чего программа vectric aspire

Примеры использования Kafka

Обмен сообщениями

Kafka хорошо работает в качестве замены более традиционного брокера сообщений. Брокеры сообщений используются по разным причинам (для разделения обработки и производителей данных, для буферизации необработанных сообщений и т.д.).

Отслеживание активности веб-сайта

Kafka можно использовать для построения конвейера отслеживания активности пользователей в виде набора каналов публикации-подписки в реальном времени. Активность на сайте (просмотры страниц, поиск или другие действия пользователей) может быть опубликована в центральных темах с одной темой для каждого типа активности.

Агрегация журналов

Kafka является заменой решения для агрегации журналов. Агрегация журналов обычно собирает физические файлы журналов с серверов и помещает их в центральное хранилище (файловый сервер или HDFS) для обработки. По сравнению с системами, ориентированными на журналы, такими как Scribe или Flume, Kafka предлагает такую же высокую производительность, более надежные гарантии долговечности благодаря репликации и гораздо меньшую сквозную задержку.

Архитектура Kafka и ее компоненты

Kafka Architecture

Брокеры Kafka

Брокер Kafka — это сервер, работающий в кластере Kafka (или, говоря иначе, кластер Kafka состоит из нескольких брокеров). Как правило, несколько брокеров работают согласованно, образуя кластер Kafka и обеспечивая балансировку нагрузки, надежное резервирование.

Apache ZooKeeper

Брокеры Kafka используют ZooKeeper для управления и координации работы кластера Kafka. ZooKeeper уведомляет все узлы, когда топология кластера Kafka меняется, в том числе когда брокеры и темы (топики) добавляются или удаляются.

Продюсеры Kafka

Продюсер Kafka служит источником данных, который оптимизирует, записывает и публикует сообщения в одной или нескольких темах (топиках) Kafka. Производители Kafka также сериализуют, сжимают и распределяют данные между брокерами с помощью разделения.

Потребители Kafka

Потребители читают данные, читая сообщения из тем, на которые они подписаны. Потребители принадлежат к группе потребителей. Каждый потребитель в определенной группе потребителей отвечает за чтение подмножества разделов каждой темы, на которую он подписан.

Топики Kafka

Топик Kafka определяет канал, по которому передаются данные. Производители публикуют сообщения в топики, а потребители читают сообщения из топика, на который они подписаны.

Партиции Kafka

В кластере Kafka темы делятся на партиции, а партиции реплицируются между брокерами. Из каждого раздела несколько потребителей могут читать из топика параллельно.

Группа потребителей

Группа потребителей Kafka включает связанных потребителей с общей задачей. Kafka отправляет сообщения из разделов темы потребителям в группе потребителей. В момент чтения каждый раздел читается только одним потребителем в группе.

Фактор репликации тем

Репликация тем необходима для создания отказоустойчивых и высокодоступных развертываний Kafka. Когда брокер выходит из строя, реплики тем на других брокерах остаются доступными, чтобы гарантировать, что данные остаются доступными и что развертывание Kafka избегает сбоев и простоев.

Kafka components and how they interact

Как обеспечить порядок сообщений

В Kafka порядок может быть гарантирован только в пределах раздела (партиции). Это означает, что если сообщения были отправлены от производителя в определенном порядке, брокер запишет их в партицию, и все потребители будут читать из нее в том же порядке. Поэтому, естественно, в теме с одной партицией легче обеспечить упорядочивание по сравнению с несколькими партициями.

Однако с одной партицией трудно добиться параллелизма и балансировки нагрузки.

Существует несколько способов, с помощью которых мы можем добиться упорядочивания сообщений, параллелизма и балансировки нагрузки одним способом. Давайте посмотрим, как Kafka обрабатывает порядок сообщений с одним брокером/одним партицией и несколькими брокерами/многими партициями.

Порядок сообщений при использовании одного брокера

Порядок сообщений в Kafka хорошо работает для одной партиции. Но с одним разделом трудно добиться параллелизма и балансировки нагрузки.

Давайте создадим топиком (с одной репликой и одной партицией) с именем топика «test»:

./kafka-topics.sh —create —zookeeper localhost:2181 —topic test —replication-factor 1 —partitions 1

Рассмотрим пример набора данных со схемами row_no, name, transaction_type и amount, как показано ниже:

0001,Test1,credit,10000 0002,Test2,debit,20000 0003,Test3,credit,30000 0004,Test4,debit,40000 0005,Test5,debit,50000

Kafka поставляется с клиентом командной строки, который будет принимать данные из файла или стандартного ввода и отправлять их в виде сообщений в кластер Kafka. По умолчанию каждая строка будет отправлена как отдельное сообщение.

Давайте опубликуем приведенный выше пример набора данных в нашей новой топика test:

./kafka-console-producer.sh —topic test —bootstrap-server localhost:9092 >0001,Test1,credit,10000 >0002,Test2,debit,20000 >0003,Test3,credit,30000 >0004,Test4,debit,40000 >0005,Test5,debit,50000

Порядок будет получен такой же, как и выше для одного топика.

./kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test —from-beginning >0001,Test1,credit,10000 >0002,Test2,debit,20000 >0003,Test3,credit,30000 >0004,Test4,debit,40000 >0005,Test5,debit,50000

Порядок сообщений при использовании нескольких брокеров

Существует три метода, с помощью которых мы можем сохранить порядок сообщений внутри разделов в Kafka. У каждого метода есть свои плюсы и минусы.

Метод 1: Round Robin или Spraying

Метод 2: Хеширование ключевых разделов

Метод 3: Пользовательский разделитель

Метод 1: Round Robin или Spraying (по умолчанию)

В этом методе разделитель будет посылать сообщения всем партициям по кругу, обеспечивая сбалансированную нагрузку на сервер.

Для примера создадим топик (с тремя репликами и тремя партициями) с именем топика «test2»:

./kafka-topics.sh —create —zookeeper localhost:2181 —topic test —replication-factor 3 —partitions 3

Опубликуем тот же набор данных в новой топике test2:

/.kafka-console-producer.sh —topic test2 —bootstrap-server localhost:9092 >0001,Test1,credit,10000 >0002,Test2,debit,20000 >0003,Test3,credit,30000 >0004,Test4,debit,40000 >0005,Test5,debit,50000

Давайте создадим потребителя, который будет получать сообщения из Kafka:

./kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test2 —from-beginning >0004,Test4,debit,40000 >0001,Test1,credit,10000 >0003,Test3,credit,30000 >0002,Test2,debit,20000 >0005,Test5,debit,50000

Давайте разберемся, что происходит под капотом. Есть три раздела (A, B и C). Раздел B работает быстро благодаря низкой сетевой и системной задержке, и сообщения, отправленные на него, были получены первыми. Затем идет раздел C, за которым следует A.

Этим методом достигается параллелизм и балансировка нагрузки, но не удается сохранить общий порядок, однако порядок внутри раздела сохраняется. Это метод по умолчанию, и он не подходит для некоторых бизнес-сценариев. Если дебетовая транзакция произойдет раньше кредитной, то это будет неясно для бизнес-пользователей, которые потребляют сообщения.

Для того чтобы преодолеть описанные выше сценарии и сохранить порядок сообщений, давайте попробуем другой подход.

Метод 2: Хеширование ключевых разделов

В этом методе мы можем создать ProducerRecord, указать ключ сообщения, вызвав new ProducerRecord (имя топика, ключ сообщения, сообщение).

Читайте также:
Python для начинающих что это за программа

Разделитель по умолчанию будет использовать хэш ключа, чтобы гарантировать, что все сообщения для одного и того же ключа попадут к одному и тому же производителю. Это самый простой и наиболее распространенный подход. Это тот же метод, который использовался. Для хеширования используется операция модуляции.

Hash(Key) % Количество партиций -> Номер партиции

Однако простая отправка строк текста приведет к сообщениям с нулевыми ключами. Чтобы отправлять сообщения с ключами и значениями, мы должны установить свойства parse.key и key.separator в командной строке при запуске производителя.

Ниже приведен фрагмент кода для метода хэширования, который устанавливает свойство parse.key в true, а для key.separator задает «:».

В приведенном ниже примере сообщений ключами являются key1, key2, а значениями — value1, value2.

— broker-list localhost:9092 — topic topic-name — property «parse.key=true» — property «key.separator=:» key1:value1 key2:value2

Давайте создадим топик (с тремя репликами и тремя партициями) с именем темы test1:

./kafka-topics.sh —create —zookeeper localhost:2181 —topic test1 —replication-factor 3 —partitions 3

И опубликуем несколько сообщений в топике test1 с ключевым значением для всех записей:

./kafka-console-producer.sh —topic test1 —bootstrap-server localhost:9092 —property «parse.key=true» —property «key.separator=:» >0001:0001,Test1,credit,10000 >0002:0002,Test2,debit,20000 >0001:0001,Test3,credit,30000 >0002:0002,Test4,debit,40000 >0001:0001,Test5,debit,50000

Как видно ниже, порядок сообщений внутри ключей сохраняется.

./kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test1 —from-beginning

./kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic test1 —from-beginning >0002:0002,Test2,debit,20000 >0002:0002,Test4,debit,40000 >0001:0001,Test1,credit,10000 >0001:0001,Test3,credit,30000 >0001:0001,Test5,debit,50000

Брокер назначает ключ 0001 для раздела A (раздел с высокой задержкой) и ключ 0002 для раздела B (раздел с низкой задержкой), используя метод хэширования ключей. Потребитель потребляет сообщение, основываясь на значении ключа в заказе.

С помощью этого метода мы можем поддерживать порядок сообщений в пределах ключа.

Но недостатком этого метода является то, что он использует случайное значение хэширования для передачи данных в назначенный раздел, и это приводит к перегрузке данных в один раздел.

Метод 3: Пользовательский разделитель

Мы можем написать собственную бизнес-логику, чтобы решить, какое сообщение должно быть отправлено в тот или иной раздел. При таком подходе мы можем упорядочить сообщения в соответствии с нашей бизнес-логикой и одновременно добиться параллелизма.

Custom Partitioner method

Основные выводы

  • Одной из важнейших функций Kafka является балансировка нагрузки на сообщения и гарантированное упорядочивание в распределенном кластере для достижения параллелизма.
  • Использование большего количества разделов приводит к увеличению пропускной способности и задержки. Однако это может привести к накладным расходам на обслуживание.
  • Используя раздел с хэширующим ключом, мы можем доставлять сообщения с одинаковым ключом по порядку, отправляя их в один и тот же раздел. Данные внутри раздела будут храниться в том порядке, в котором они были записаны. Поэтому данные, прочитанные из раздела, будут прочитаны по порядку для этого раздела с ключом производителя.
  • Используя Пользовательский разделитель, мы можем маршрутизировать сообщения, используя произвольные бизнес-правила.

Похожие записи:

  1. Какова производительность Kafka, если он записывает данные на диск?
  2. Как, где и когда Apache Kafka записывает данные
  3. Kafka: Удалить топик из консоли
  4. Kafka: увеличить количество партиций топика
  5. Установка Apache Kafka в CentOS Stream
  6. Что делает Apache Kafka таким быстрым
  7. Kraken v2.7.0 build 247

Источник: g-soft.info

Установка и работа с Kafka на Linux

Обновлено и опубликовано

Опубликовано: 03.10.2022

  • Установки и настройке Kafka.
  • Конфигурирование, а также запуск сервисов zookeeper и kafka.
  • Выполнение тестовых запросов к брокеру сообщений.

Так как установка Kafka выполняется путем распаковки бинарника, инструкция написана для использования на различных дистрибутивах Linux, например, Ubuntu, Rocky, Astra, РЕД ОС.

Наше ознакомление с программным продуктом разобьем на разделы:

Подготовка системы

Перед тем, как приступить к установке Kafka, выполним предварительную настройку системы. Рассмотрим варианты для дистрибутивов на основе deb и rpm.

Установка служебных пакетов

Для работы нам понадобятся некоторые утилиты:

  • curl — отправка http-запросов, в том числе, для загрузки файлов.
  • tar — распаковка и создание архивов.
  • wget — загрузка файлов по сети.

Установим их заранее.

а) Для систем на основе DEB:

apt install curl tar wget

б) Для систем на основе RPM:

yum install curl tar wget

Настройка брандмауэра

Для работы с кафкой по сети необходимо открыть порт 9092. В зависимости от утилиты управления брандмауэром наши команды будут отличаться.

а) Для Iptables (как правило, для deb):

iptables -I INPUT -p tcp —dport 9092 -j ACCEPT

Для сохранения правил используем утилиту iptables-persistent:

apt install iptables-persistent

б) Для Firewalld (как правило, для rpm):

firewall-cmd —permanent —add-port=9092/tcp

firewall-cmd —permanent —reload

Установка Kafka

Как упоминалось выше, установка программного продукта выполняется путем распаковки бинарника, а также настройки и установки необходимых компонентов. Итого, мы должны:

  1. Установить JDK.
  2. Распаковать бинарник Kafka.
  3. Внести правки в конфигурационный файл.
  4. Создать юнит-файлы для запуска сервисов zookeeper и kafka.

Установка OpenJDK

В зависимости от типа Linux наши команды будут немного отличаться.

а) Для систем на основе DEB:

apt install default-jdk

б) Для систем на основе RPM:

yum install java-11-openjdk-devel

Смотрим версию java:

Мы должны увидеть что-то на подобие:

openjdk version «11.0.10» 2021-01-19
OpenJDK Runtime Environment .

Установка и настройка Kafka

Переходим на страницу загрузки Kafka и копируем ссылку на скачиваем приложения:

Копируем ссылку для загрузки архива kafka

Используя скопированную ссылку, скачиваем бинарник:

Создадим каталог, куда установим кафку:

Распакуем скачанный архив в созданный каталог:

tar zxf kafka_*.tgz -C /opt/kafka —strip 1

Интернет сообщество сразу рекомендует внести небольшую правку в конфигурационный файл. Откроем его:

* данная директива разрешает ручное удаление темы из кафки.

Настройка запуска kafka

Осталось настроить запуск кафки в качестве сервиса. Мы создадим отдельного пользователя и создадим два юнита systemd — один для zookeeper, второй для kafka.

Для создания пользователя вводим:

useradd -r -c ‘Kafka broker user service’ kafka

Назначим владельцем созданного пользователя для каталога кафки:

chown -R kafka:kafka /opt/kafka

Создаем первый юнит-файл:

[Unit]
Description=Zookeeper Service
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure

Создаем второй файл для кафки:

[Unit]
Description=Kafka Service
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c ‘/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>Hello, World from Kafka» | /opt/kafka/bin/kafka-console-producer.sh —broker-list localhost:9092 —topic Test

* в данном примере мы отправляем в наш сервер сообщение Hello, World from Kafka.

Попробуем достать сообщение. Выполняем команду:

/opt/kafka/bin/kafka-console-consumer.sh —bootstrap-server localhost:9092 —topic Test —from-beginning

* опция from-beginning позволяет увидеть все сообщения, которые были отправлены в брокер до создания подписчика (отправки запроса на чтения данных из кафки).

Мы должны увидеть:

Hello, World from Kafka

При этом мы подключимся к серверу в интерактивном режиме. Не спешим выходить. Откроем вторую сессию SSH и еще раз введем:

echo «Hello, World from Kafka again» | /opt/kafka/bin/kafka-console-producer.sh —broker-list localhost:9092 —topic Test

Вернемся к предыдущей сессии SSH и мы должны увидеть:

Hello, World from Kafka
Hello, World from Kafka again

Можно считать, что программа минимум выполнена — Kafka установлена и работает.

Источник: www.dmosk.ru

Рейтинг
( Пока оценок нет )
Загрузка ...
EFT-Soft.ru