Недавно я поделился опытом о том, какие параметры мы в команде чаще всего используем для Kafka Producer и Consumer, чтобы приблизиться к гарантированной доставке. В этой статье хочу рассказать, как мы организовали повторную обработку события, полученного из Kafka, в результате временной недоступности внешней системы.
Современные приложения работают в очень сложной среде. Бизнес-логика, обернутая в современный технологический стек, работающая в Docker-образе, который управляется оркестратором вроде Kubernetes или OpenShift, и коммуницирующая с другими приложениями или enterprise-решениями через цепочку физических и виртуальных маршрутизаторов. В таком окружении всегда что-то может сломаться, поэтому повторная обработка событий в случае недоступности одной из внешних систем — важная часть наших бизнес-процессов.
Как было до Kafka
Ранее в проекте мы использовали IBM MQ для асинхронной доставки сообщений. При возникновении какой-либо ошибки в процессе работы сервиса полученное сообщение могло быть помещено в dead-letter-queue (DLQ) для дальнейшего ручного разбора. DLQ создавался рядом с входящей очередью, перекладывание сообщения происходило внутри IBM MQ.
HOW TO ACCESS OUR FREE ENGLISH CONVERSATION CLASSES ON ZOOM
Если ошибка имела временный характер и мы могли это определить (например, ResourceAccessException при HTTP-вызове или MongoTimeoutException при запросе в MongoDb), то в силу вступала стратегия повторных вызовов. Вне зависимости от ветвления логики приложения, исходное сообщение перекладывалось или в системную очередь для отложенной отправки, или в отдельное приложение, которое когда-то давно было сделано для повторной отправки сообщений. При этом в заголовок сообщения записывается номер повторной отправки, который привязан к интервалу задержки или к концу стратегии на уровне приложения. Если мы достигли конца стратегии, но внешняя система все еще недоступна, то сообщение будет помещено в DLQ для ручного разбора.
Поиск решения
Поискав в интернете, можно найти следующее решение. Если коротко, то предлагается завести по топику на каждый интервал задержки и реализовать на стороне приложения Consumer’ы, которые будут вычитывать сообщения с необходимой задержкой.
Несмотря на большое количество положительных отзывов, оно кажется мне не совсем удачным. В первую очередь потому, что разработчику, кроме реализации бизнес-требований, придется потратить много времени на реализацию описанного механизма.
Кроме того, если на Kafka-кластере включено управление доступами, то придется потратить какое-то время на заведение топиков и обеспечение необходимых доступов к ним. В дополнение к этому нужно будет подбирать правильный параметр retention.ms для каждого из ретрай-топиков, чтобы сообщения успевали повторно отправляться и не пропадали из него. Реализацию и запрос доступов придется повторять для каждого существующего или нового сервиса.
Давайте теперь посмотрим, какие механизмы для повторной обработки сообщения предоставляет нам spring в целом и spring-kafka в частности. Spring-kafka имеет транзитивную зависимость на spring-retry, который предоставляет абстракции для управления разными BackOffPolicy. Это довольно гибкий инструмент, но его значительным недостатком является хранение сообщений для повторной отправки в памяти приложения. Это значит, что перезапуск приложения из-за обновления или ошибки во время эксплуатации приведет к потере всех сообщений, ожидающих повторной обработки. Так как этот пункт критичен для нашей системы, мы не стали рассматривать его дальше.
Урок 24. Работа с датой и временем. Java Date Time API
Сама spring-kafka предоставляет несколько реализаций ContainerAwareErrorHandler, например SeekToCurrentErrorHandler, с помощью которого можно, не смещая offset в случае возникновения ошибки, обработать сообщение позже. Начиная с версии spring-kafka 2.3 появилась возможность задавать BackOffPolicy.
Этот подход позволяет повторно обрабатываемым сообщениям переживать рестарт приложения, но механизм DLQ по-прежнему отсутствует. Именно этот вариант мы выбрали в начале 2019 года, оптимистично полагая, что DLQ не понадобится (нам повезло и он действительно не понадобился за несколько месяцев эксплуатации приложения с такой системой повторной обработки). Временные ошибки приводили к срабатыванию SeekToCurrentErrorHandler. Остальные ошибки печатались в лог, приводили к смещению offset, и обработка продолжалась со следующим сообщением.
Итоговое решение
Реализация, основанная на SeekToCurrentErrorHandler, подтолкнула нас к разработке собственного механизма для повторной отправки сообщений.
Прежде всего мы хотели использовать уже имеющийся опыт и расширить его в зависимости от логики приложения. Для приложения с линейной логикой оптимальным было бы прекратить считывание новых сообщений в течение небольшого промежутка времени, заданного в рамках стратегии повторных вызовов. Для остальных приложений хотелось иметь единую точку, которая бы обеспечивала выполнение стратегии повторных вызовов. В дополнение эта единая точка должна обладать функциональностью DLQ для обоих подходов.
Сама стратегия повторных вызовов должна храниться в приложении, которое отвечает за получение следующего интервала при возникновении временной ошибки.
Остановка Consumer’a для приложения с линейной логикой
При работе с spring-kafka код для остановки Consumer’a может выглядеть примерно так:
public void pauseListenerContainer(MessageListenerContainer listenerContainer, Instant retryAt) < if (nonNull(retryAt) listenerContainer.isRunning()) < listenerContainer.stop(); taskScheduler.schedule(() ->listenerContainer.start(), retryAt); return; > // to DLQ >
В примере retryAt — это время, когда нужно заново запустить MessageListenerContainer, если он еще работает. Повторный запуск произойдет в отдельном потоке, запущенном в TaskScheduler, реализацию которого тоже предоставляет spring.
Значение retryAt мы находим следующим способом:
- Ищется значение счетчика повторных вызовов.
- В соответствии со значением счетчика ищется текущий интервал задержки в стратегии повторных вызовов. Стратегия объявляется в самом приложении, для ее хранения мы выбрали формат JSON.
- Найденный в JSON-массиве интервал содержит в себе количество секунд, через которое необходимо будет повторить обработку. Это количество секунд прибавляется к текущему времени, образуя значение для retryAt.
- Если интервал не найден, то значение retryAt равно null и сообщение отправится в DLQ для ручного разбора.
При таком подходе остается только сохранить количество повторных вызовов для каждого сообщения, которое находится сейчас на обработке, например в памяти приложения. Сохранение счетчика попыток в памяти не критично для этого подхода, так как приложение с линейной логикой не может выполнять обработку в целом. В отличие от spring-retry перезапуск приложения приведет не к потере всех сообщений для повторной обработки, а просто к перезапуску стратегии.
Этот подход помогает снять нагрузку с внешней системы, которая может быть недоступна из-за очень большой нагрузки. Другими словами, в дополнение к повторной обработке мы добились реализации паттерна circuit breaker.
В нашем случае порог ошибки равен всего 1, а чтобы минимизировать простой системы из-за временного сетевого перебоя, мы используем очень гранулярную стратегию повторных вызовов с небольшими интервалами задержки. Это может не подойти для всех приложений группы компаний, поэтому соотношение между порогом ошибки и величиной интервала нужно подбирать, опираясь на особенности системы.
Отдельное приложение для обработки сообщений от приложений с недетерминированной логикой
Вот пример кода, отправляющего сообщение в такое приложение (Retryer), которое выполнит повторную отправку в топик DESTINATION при достижении времени RETRY_AT:
public void retry(ConsumerRecord record, String retryToTopic, Instant retryAt, String counter, String groupId, Exception e) < Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders()); ListarrayOfHeaders = new ArrayList<>(Arrays.asList(headers.toArray())); updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes); updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes); updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, () -> Integer.toString(record.partition()).getBytes()); if (nonNull(retryAt)) < updateHeader(arrayOfHeaders, COUNTER, counter::getBytes); updateHeader(arrayOfHeaders, SEND_TO, «retry»::getBytes); updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes); >else < updateHeader(arrayOfHeaders, REASON, ExceptionUtils.getStackTrace(e)::getBytes); updateHeader(arrayOfHeaders, SEND_TO, «backout»::getBytes); >ProducerRecord messageToSend = new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders); kafkaTemplate.send(messageToSend); >
Из примера видно, что много информации передается в хедерах. Значение RETRY_AT находится так же, как и для механизма повтора через остановку Consumer’a. Помимо DESTINATION и RETRY_AT мы передаем:
- GROUP_ID, по которому группируем сообщения для ручного анализа и упрощения поиска.
- ORIGINAL_PARTITION, чтобы постараться сохранить тот же Consumer для повторной обработки. Этот параметр может быть равен null, в таком случае новая partition будет получена по ключу record.key() оригинального сообщения.
- Обновленное значение COUNTER, чтобы следовать стратегии повторных вызовов.
- SEND_TO — константа, показывающая, отправить ли сообщение на повторную обработку по достижении RETRY_AT или поместить в DLQ.
- REASON — причина, по которой обработка сообщения была прервана.
Retryer сохраняет сообщения для повторной отправки и ручного разбора в PostgreSQL. По таймеру запускается задача, которая находит сообщения с наступившим RETRY_AT и отправляет их обратно в партицию ORIGINAL_PARTITION топика DESTINATION с ключом record.key().
После отправки сообщения удаляются из PostgreSQL. Ручной разбор сообщений происходит в простом UI, который взаимодействует с Retryer по REST API. Основными его особенностями являются переотправка или удаление сообщений из DLQ, просмотр информации об ошибке и поиск сообщений, например по имени ошибки.
Так как на наших кластерах включено управление доступом, необходимо дополнительно запрашивать доступы к топику, который слушает Retryer, и дать возможность Retryer’у писать в DESTINATION топик. Это неудобно, но, в отличие от подхода с топиком на интервал, у нас появляется полноценная DLQ и UI для управления ею.
Бывают случаи, когда входящий топик читают несколько разных consumer-групп, приложения которых реализуют разную логику. Повторная обработка сообщения через Retryer для одного из таких приложений приведет к дубликату на другом. Чтобы защититься от этого, мы заводим отдельный топик для повторной обработки. Входящий и retry-топик может читать один и тот же Consumer без каких-либо ограничений.
По умолчанию этот подход не предоставляет возможности circuit breaker’a, однако его можно добавить в приложение с помощью spring-cloud-netflix или нового spring cloud circuit breaker, обернув места вызовов внешних сервисов в соответствующие абстракции. Кроме того, появляется возможность выбора стратегии для bulkhead паттерна, что тоже может быть полезно. Например, в spring-cloud-netflix это может быть thread pool или семафор.
Вывод
В результате у нас получилось отдельное приложение, которое позволяет повторить обработку сообщения при временной недоступности какой-либо внешней системы.
Одним из главных преимуществ приложения является то, что им могут пользоваться внешние системы, работающие на том же Kafka-кластере, без значительных доработок на своей стороне! Такому приложению необходимо будет только получить доступ к retry-топику, заполнить несколько Kafka-заголовков и отправить сообщение в Retryer. Не нужно поднимать никакой дополнительной инфраструктуры. А чтобы уменьшить количество перекладываемых сообщений из приложения в Retryer и обратно, мы выделили приложения с линейной логикой и сделали в них повторную обработку через остановку Consumer.
- Блог компании TINKOFF
- Java
- Распределённые системы
- Микросервисы
Источник: habr.com
Schedule consumer dialogue 1.0.0.1
Schedule consumer dialogue это программное обеспечение Shareware в категории (2), разработанная Schedule consumer dialogue.
Проверяли обновления 63 раз пользователями нашего клиентского приложения UpdateStar в прошлом месяце.
Последняя версия Schedule consumer dialogue-1.0.0.1, выпущенный на 01.06.2014. Первоначально он был добавлен в нашу базу данных на 11.07.2013. Самой распространенной версией является 1.0.0.1, который используется в 50% всех установок.
Schedule consumer dialogue работает на следующих операционных системах: Windows.
Schedule consumer dialogue не был оценен нашими пользователями еще.
Написать обзор для Schedule consumer dialogue!
Установки
63 пользователи UpdateStar были установлены в прошлом месяце Schedule consumer dialogue.
Последние обновления
26.12.2022 | Webcam and Screen Recorder 8.1.728 |
26.12.2022 | TimeTrex Time and Attendance 16.2.2.1052 |
26.12.2022 | Fake Webcam 7.4.358 |
26.12.2022 | GHC Timetables EN 19.2.85 |
25.12.2022 | Batch Excel to PDF Converter 2022.14.1225 |
Источник: schedule-consumer-dialogue.updatestar.com
scheduler.exe — что это за процесс?
Данный процесс может принадлежать разному ПО, в большинстве случаев отвечает за работу функции расписания, планировщика.
Например присутствует программа по очистке ПК. В настройках указали — запускать каждый день при включении компьютера, или каждый день в 10 00 утра. За выполнение очистки в указанное время — может отвечать компонент, название которого — scheduler.exe. Это только пример, под данным названием спокойно может быть и вирус, рекламный модуль, сомнительное ПО.
Как выяснить от какой программы процесс scheduler.exe? Откройте диспетчер задач. Найдите процесс. Нажмите правой кнопкой > пункт открыть расположение. Откроется папка с выделенным файлом scheduler.exe. Окно папки в верхней части содержит путь (слева стрелочка вверх).
Нажмите одни раз по нему — вы увидите полный путь папки, где расположен scheduler.exe. При упоминании C:Program Files — после будет название ПО, к котором принадлежит процесс. Если путь содержит C:Windows — видимо это системный модуль или вирус, который маскируется под системный компонент. В любом случае путь файла сможет намекнуть к какому приложению относится процесс, или это система.
Что делать дальше? Грузить scheduler.exe ПК не должен. Если грузит — возможно вирус, например майнер. Необходимо просканировать компьютер антивирусными утилитами против опасных вирусов и рекламных модулей. Всего существует три лучших утилиты: Dr.Web CureIT! (против опасных вирусов), AdwCleaner и HitmanPro (против рекламных модулей).
Источник: 990x.top
Что такое usoclient.exe в Windows 10?
При каждом запуске Windows 10 вы видите черное окно с надписью WINDOWSsystem32usoclient.exe, которое затем быстро исчезает? Это системный процесс или все же вирус? В этом посте постараемся ответить на некоторые из ваших вопросов.
Файл usoclient.exe находится в папке System32. Свойства файла показывают, что его размер составляет 19.5 Кб. Онлайн-анализаторы подозрительных файлов VirusTotal и Jotti подтверждают, что usoclient.exe абсолютно чист, т.е. не является зловредным. Но если вы обнаружили файл с этим именем в любой другой папке, вполне вероятно, что это вирус.
Оригинальный файл usoclient.exe (тот, что находится в WINDOWSsystem32usoclient.exe) отвечает за выполнение автоматического сканирования Центра обновления Windows. В этом вы можете убедиться сами, открыв штатный Планировщик заданий. Упоминание об этом процессе вы найдете в разделе Библиотека планировщика заданий -> Microsoft -> Windows -> UpdateOrchestrator.
Причиной всплывающего окна консоли при каждом запуске системы является задача Schedule Scan. Это ясно. Не ясно только почему на одних компьютерах окно появляется, а на других нет (в моем случае на 1 из 3), хотя упомянутая выше задача есть на каждом из них. В любом случае, если вам мешает это окно, просто отключите Schedule Scan и больше оно вас не побеспокоит. Отключение этой задачи никак не повредит вашей системе, только не забывайте периодически вручную выполнять проверку на наличие новых обновлений.
Источник: windowstips.ru