Platforma Ferryt potrafi komunikować się z zewnętrznymi systemami w celu wymiany wiadomości i komunikatów za pośrednictwem platformy Apache Kafka. Kafka staje się coraz bardziej popularną platformą wymiany komunikatów. Stosowana jest przez instytucje i korporacje, gdzie wymagana jest wysokowydajna komunikacja, strumieniowe przesyłanie danych i pipeline-y danych.
Celem artykułu jest omówienie mechanizmów integracji Apache Kafka z platformą Ferryt. Nie będziemy omawiać mechaniki działania samej Kafki. Dla czytelników, którzy są zainteresowani czym jest Kafka, polecamy ten materiał.
Apache Kafka w Ferryt
Platforma Ferryt może wystąpić zarówno jako Producer – system publikujący na Kafkę, jak i Consumer – system odczytujący zdarzenia z Kafki. Ferryt potrafi obsłużyć wiele formatów danych, w szczególności także schematy Apache Avro™.
Trochę więcej o Apache Avro™
Apache Avro™ jest systemem serializacji danych, zoptymalizowanym dla strumieniowego przetwarzania danych. Jest to jeden z podstawowych formatów danych używanych w Kafka, który opiera się na schematach danych JSON.
Oto przykładowy plik ze schematem Avro:
{ "name": "Registry", "type": "record", "fields": [{ "name": "schemaName", "type": ["null", "string"] }, { "name": "Id", "type": ["null", "string"] }, { "name": "modifyTime", "type": ["null", { "type": "long", "logicalType": "timestamp-millis" } ] }, { "name": "modifyUser", "type": ["null", "string"] }, { "name": "data", "type": ["null", { "name": "data", "type": "record", "fields": [{ "name": "Temat", "type": ["null", "string"] }, { "name": "Tresc", "type": ["null", "string"] } ] } ] } ] }
Konfiguracja z punktu widzenia architekta rozwiązań biznesowych w Ferryt
Rejestracja schema komunikatu
Pierwszym krokiem, niezależnie od tego czy rejestrujemy Ferryt jako producera czy consumera Kafki, jest zarejestrowanie schematu komunikatu. Posiadając plik ze schematem Avro należy utworzyć na jego podstawie typ danych.

Rejestrowanie Ferryt jako producera
Jeżeli w danym rozwiązaniu biznesowym konieczne jest wysyłanie komunikatów na Kafka to należy zarejestrować Ferryt jako producera zdarzeń.
W tym celu należy zarejestrować w Ferryt serwis zewnętrzny, podając parametry Kafki:
- Adres brokerów
- Topic
- Sposób uwierzytelnienia
- Format komunikatu
Więcej na temat mechanizmów integracji w Ferryt, w tym serwisów zewnętrznych, w tym materiale.
Tak skonfigurowany serwis zewnętrzny można używać w dowolnym miejscu w procesie. Wysłanie komunikatu na Kafkę następuje w momencie wywołania w runtime silnika workflow bloczka Serwis zewnętrzny.
Rejestrowanie Ferryt jako consumera
Aby Ferryt był consumerem zdarzeń Kafka to należy zarejestrować w Ferryt zdarzenie zewnętrzne podając ponownie:
- adres brokerów,
- topic,
- sposób uwierzytelnienia,
- format komunikatu,
- funkcję workflow, którą należy wywołać dla każdego odczytanego komunikatu.
Eventy z Kafki są odczytywane cyklicznie, co określony interwał czasowy, przez mechanizm schedulerów Ferryt. W trakcie jednego odczytu pobierana jest „paczka” nowych komunikatów, których nie było na topicu przy poprzednim odczycie. Dla każdego takiego komunikatu uruchamiana jest funkcja workflow. W tej funkcji architekt rozwiązania biznesowego może zamodelować dowolną aktywność wymaganą do wykonania po otrzymaniu komunikatu z Kafki, np. przekazanie do wniosku danych lub wywołanie akcji na wniosku.
Implementacja Kafka w Ferryt – Confluent Kafka
Ogólny schemat komunikacji pomiędzy Ferryt a serwerem Kafki przedstawiono na poniższym schemacie:

Za komunikację z serwerem Kafka odpowiada moduł Ferryt Kafka Connector. Jego zadaniem jest ustanowienie bezpośredniego połączenia z serwerem oraz wysłanie i pobranie komunikatu.
W Ferryt jest zaimplementowane rozwiązanie oparte na bibliotece do obsługi Confluent Kafka. Jest to rozszerzenie zbudowane przez twórców Apache Kafka®. Rozszerza ono listę zalet Kafki o funkcje klasy korporacyjnej, usuwając jednocześnie obciążenia związane z zarządzaniem lub monitorowaniem Kafki.
Do komunikacji używany jest Confluent.Kafka dla platformy.NET. Jest to klient Kafki dostępny w pakiecie nuGet.
Biblioteka ta zapewnia techniczną obsługę producera i consumera. Rozwiązanie jest kompatybilne ze wszystkimi brokerami Apache Kafka ®. W kontekście wersji .NET Framework jest kompatybilny z .NET Framework >= v4.6.2, .NET Core >= v1.0 i .NET Standard.
Tworzenie producera Kafki z wykorzystaniem komponentu Confluent.Kafka
W poniżej podanym przykładzie zaprezentowano fragment kodu obrazujący etap tworzenia [producera. Podane dane są przykładowe i mają na celu zaprezentować funkcjonalność.
1. W pierwszym kroku dodawana jest referencja do biblioteki Confluent:
using Confluent.Kafka;
2. Następnie tworzony jest słownik konfiguracyjny, w którym podawany jest adres brokera:
var config = new Dictionary<string, object> { { "bootstrap.servers", "host1:9092" }, };
3. Kolejnym krokiem jest utworzenie producera poprzez odniesienie się do wcześniej utworzonego słownika konfiguracyjnego. Wiadomości będą przesyłane w formacie binarnym:
using (var producer = new Producer<Null, byte[]>(config, null, new Confluent.Kafka.Serialization.ByteArraySerializer()))
4. Tworzenie wiadomości asynchronicznej z przykładową wiadomością:
Task<Message<Null, byte[]>> deliveryReport = producer.ProduceAsync("testTopic", null, Encoding.ASCII.GetBytes("wiadomość testowa"));
5. Końcowy kod, jeżeli zamkniemy go w metodę, powinien wyglądać tak:
public void CreateProducerSendMsg() { var config = new Dictionary<string, object> { { "bootstrap.servers", " host1:9092" }, }; using (var producer = new Producer<Null, byte[]>(config, null, new Confluent.Kafka.Serialization.ByteArraySerializer())) { Task<Message<Null, byte[]>> deliveryReport = producer.ProduceAsync("testTopic", null, Encoding.ASCII.GetBytes("wiadomość testowa")); }; }
Tworzenie consumera Kafki z wykorzystaniem komponentu Confluent.Kafka
Spójrzmy, jak wygląda kod:
1. Dodawana jest wymagana referencja do biblioteki:
using Confluent.Kafka;
2. Tworzony jest słownik konfiguracyjny z obowiązkową konfiguracją. W przykładzie przedstawiamy tylko te opcje, które są wymagane. Ferryt konfiguruje więcej opcji.
var options = new Dictionary<string, object> { { "group.id", "Ferryt"}, { "bootstrap.servers", " host1:9092"}, };
Właściwość group.id jest obowiązkowa i określa, do której grupy konsumenckiej należy consumer.
3. Utworzenie consumera, który zawiera skonfigurowany słownik:
using (var consumer = new Consumer<Null, byte[]>(options, null, new Confluent.Kafka.Serialization.ByteArrayDeserializer()))
4. Subskrypcja consumera – wymagane, aby konsument mógł dołączyć do group.id:
consumer.Subscribe("testTopic");
5. Pobranie wiadomości w formacie binarnym:
Message<Null, byte[]> msg; while (consumer.Consume(out msg, 1000)) { try { consumer.CommitAsync(msg).Wait(); return msg.Value; } catch (Exception ex) { throw; } }
6. Końcowy kod zamknięty w metodzie:
public byte[] CreateConsumerReciveMsg() { var options = new Dictionary<string, object> { { "group.id", "Ferryt"}, { "bootstrap.servers", " host1:9092"}, }; using (var consumer = new Consumer<Null, byte[]>(options, null, new Confluent.Kafka.Serialization.ByteArrayDeserializer())) { consumer.Subscribe("testTopic"); Message<Null, byte[]> msg; while (consumer.Consume(out msg, 1000)) { try { consumer.CommitAsync(msg).Wait(); return msg.Value; } catch (Exception ex) { throw; } } } return null; }
Podsumowanie
Mechanizmy integracyjne w Ferryt związane z obsługą Apache Kafka pozwalają na low-codowe przetwarzanie komunikatów z i do brokera Kafki. Specyfika Kafki umożliwia budowanie na platformie Ferryt rozwiązań biznesowych, gdzie komunikaty wysyłane są potokowo. Dzieje się tak bez obawy o niekorzystny wpływ na procesowane wniosków czy wydajność platformy.
Autorzy:
Krystyna Masińska – Developer
Dorota Wichowska – Senior Ferryt Manager, Team Leader