Перейти к основному содержимому
Перейти к основному содержимому

Подключение Apache NiFi к ClickHouse

Community Maintained

Apache NiFi — это программное обеспечение с открытым исходным кодом для управления рабочими процессами, предназначенное для автоматизации потоков данных между программными системами. Оно позволяет создавать конвейеры данных ETL и поставляется с более чем 300 процессорами данных. Это пошаговое руководство показывает, как подключить Apache NiFi к ClickHouse в качестве источника и приёмника данных, а также загрузить тестовый набор данных.

Соберите данные для подключения

Чтобы подключиться к ClickHouse по HTTP(S) вам потребуется следующая информация:

Параметр(ы)Описание
HOST and PORTTypically, the port is 8443 when using TLS or 8123 when not using TLS.
DATABASE NAMEOut of the box, there is a database named default, use the name of the database that you want to connect to.
USERNAME and PASSWORDOut of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select a service and click Connect:

ClickHouse Cloud service connect button

Choose HTTPS. Connection details are displayed in an example curl command.

ClickHouse Cloud HTTPS connection details

If you're using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.

Загрузите и запустите Apache NiFi

Для новой установки загрузите бинарный файл с https://nifi.apache.org/download.html и запустите его, выполнив команду ./bin/nifi.sh start

Загрузите JDBC-драйвер ClickHouse

  1. Перейдите на страницу релизов драйвера ClickHouse JDBC на GitHub и найдите последнюю доступную версию JDBC-драйвера
  2. В разделе релиза нажмите на "Show all xx assets" и найдите JAR-файл, в имени которого есть ключевое слово "shaded" или "all", например clickhouse-jdbc-0.5.0-all.jar
  3. Поместите JAR-файл в каталог, доступный Apache NiFi, и запомните его абсолютный путь

Добавьте службу контроллера DBCPConnectionPool и настройте её свойства

  1. Чтобы настроить Controller Service в Apache NiFi, перейдите на страницу конфигурации потока NiFi (NiFi Flow Configuration), нажав кнопку с изображением шестерёнки

    Страница NiFi Flow Configuration с выделенной кнопкой-шестерёнкой
  2. Выберите вкладку Controller Services и добавьте новый Controller Service, нажав кнопку + в правом верхнем углу

    Вкладка Controller Services с выделенной кнопкой добавления
  3. Найдите DBCPConnectionPool и нажмите кнопку "Add"

    Диалог выбора Controller Service с выделенным DBCPConnectionPool
  4. Только что добавленная служба контроллера DBCPConnectionPool по умолчанию будет находиться в статусе Invalid. Нажмите кнопку с иконкой «шестерёнки», чтобы начать настройку

    Список Controller Services с DBCPConnectionPool в статусе Invalid и выделенной кнопкой с иконкой шестерёнки
  5. В разделе Properties введите следующие значения

СвойствоЗначениеПримечание
URL подключения к базе данныхjdbc:ch:https://HOSTNAME:8443/default?ssl=trueЗамените HOSTNAME в URL подключения на нужное значение
Имя класса драйвера базы данныхcom.clickhouse.jdbc.ClickHouseDriver
Расположение драйвера базы данных/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarАбсолютный путь к JAR-файлу JDBC-драйвера ClickHouse
Пользователь базы данныхdefaultИмя пользователя ClickHouse
ПарольpasswordПароль ClickHouse
  1. В разделе Settings измените имя Controller Service на ClickHouse JDBC для удобства

    Диалог настройки DBCPConnectionPool с заполненными свойствами
  2. Активируйте Controller Service DBCPConnectionPool, нажав кнопку с изображением молнии, а затем кнопку «Enable»

    Список Controller Services с выделенной кнопкой с иконкой молнии

    Диалог подтверждения включения Controller Service
  3. Проверьте вкладку Controller Services и убедитесь, что Controller Service включён

    Список Controller Services с включённым сервисом ClickHouse JDBC

Чтение из таблицы с помощью процессора ExecuteSQL

  1. Добавьте процессор ExecuteSQL, а также соответствующие предыдущие и последующие процессоры

    Рабочая область NiFi с процессором ExecuteSQL в составе рабочего процесса
  2. В разделе "Properties" процессора ​​ExecuteSQL введите следующие значения

    PropertyValueRemark
    Database Connection Pooling ServiceClickHouse JDBCВыберите Controller Service, настроенный для ClickHouse
    SQL select querySELECT * FROM system.metricsВведите здесь запрос
  3. Запустите процессор ExecuteSQL

    Окно настройки процессора ExecuteSQL с заполненными свойствами
  4. Чтобы убедиться, что запрос был успешно выполнен, изучите один из FlowFile в выходной очереди

    Диалог списка очереди, показывающий FlowFile, готовые к проверке
  5. Переключите вид на "formatted", чтобы просмотреть результат выходного FlowFile

    Просмотрщик содержимого FlowFile, отображающий результаты запроса в отформатированном виде

Запись в таблицу с помощью процессоров MergeRecord и PutDatabaseRecord

  1. Чтобы записать несколько строк одним оператором INSERT, сначала нужно объединить несколько записей в одну. Это можно сделать с помощью процессора MergeRecord.

  2. В разделе Properties процессора MergeRecord введите следующие значения

    PropertyValueRemark
    Record ReaderJSONTreeReaderВыберите соответствующий Record Reader
    Record WriterJSONReadSetWriterВыберите соответствующий Record Writer
    Minimum Number of Records1000Установите большее значение, чтобы минимальное количество строк объединялось в одну запись. По умолчанию — 1 строка
    Maximum Number of Records10000Установите значение больше, чем "Minimum Number of Records". По умолчанию — 1 000 строк
  3. Чтобы убедиться, что несколько записей объединены в одну, просмотрите входные и выходные данные процессора MergeRecord. Обратите внимание, что выходные данные представляют собой массив из нескольких входных записей.

    Входные данные

    Входные данные процессора MergeRecord с отдельными записями

    Результат

    Выходные данные процессора MergeRecord с объединённым массивом записей
  4. В разделе Properties процессора PutDatabaseRecord введите следующие значения

    PropertyValueRemark
    Record ReaderJSONTreeReaderВыберите подходящий Record Reader
    Database TypeGenericОставьте значение по умолчанию
    Statement TypeINSERT
    Database Connection Pooling ServiceClickHouse JDBCВыберите службу контроллера ClickHouse
    Table NametblУкажите здесь имя вашей таблицы
    Translate Field NamesfalseУстановите в "false", чтобы вставляемые имена полей совпадали с именами столбцов
    Maximum Batch Size1000Максимальное количество строк на один INSERT. Это значение не должно быть меньше значения "Minimum Number of Records" в процессоре MergeRecord
  5. Чтобы подтвердить, что каждая операция вставки содержит несколько строк, убедитесь, что количество строк в таблице увеличивается как минимум на значение параметра «Minimum Number of Records», заданное в MergeRecord.

    Результаты выполнения запроса, показывающие количество строк в целевой таблице
  6. Поздравляем — вы успешно загрузили данные в ClickHouse с помощью Apache NiFi!