Przedstawienie Amazon MSK Connect — przesyłanie strumieniowe danych do i z klastrów Apache Kafka za pomocą Managed Connectors

30 września 2021

Apache Kafka to platforma typu open source do tworzenia strumieni danych i aplikacji strumieniowych w czasie rzeczywistym. Podczas wydarzenia re: Invent 2018, AWS ogłosił Amazon Managed Streaming for Apache Kafka, w pełni zarządzaną usługę, która ułatwia tworzenie i uruchamianie aplikacji wykorzystujących Apache Kafka do przetwarzania danych przesyłanych strumieniowo.

Korzystając z Apache Kafka, dane przechwytywane są w czasie rzeczywistym ze źródeł, takich jak urządzenia IoT, ze zdarzeń związanych ze zmianą bazy danych czy tzw. clickstream’ów i dostarczane do miejsc docelowych, takich jak bazy danych i pamięć trwała.

Kafka Connect to komponent Apache Kafka, typu open source, który zapewnia framework do łączenia się z systemami zewnętrznymi, takimi jak bazy danych, magazyny typu key-value, indeksy wyszukiwania i systemy plików. Jednak ręczne uruchamianie klastrów Kafka Connect wymaga zaplanowania i udostępnienia wymaganej infrastruktury, radzenia sobie z utrzymaniem klastra i ich skalowania w odpowiedzi na zmiany obciążenia.

16 września AWS ogłosił nową funkcję, która ułatwia zarządzanie klastrami Kafka Connect. MSK Connect umożliwia skonfigurowanie i wdrożenie konektora za pomocą Kafka Connect poprzez zaledwie kilka kliknięć. MSK Connect udostępnia wymagane zasoby i konfiguruje klaster.n Stale monitoruje kondycję i stan dostarczania konektorów, tzw. patche (poprawki) i zarządza sprzętem bazowym oraz automatycznie skaluje konektory w celu dopasowania do zmian przepustowości. W rezultacie możesz skupić swoje zasoby na tworzeniu aplikacji zamiast na zarządzaniu infrastrukturą.

MSK Connect jest w pełni kompatybilna z Kafka Connect, co oznacza, że możesz migrować istniejące konektory bez zmian w kodzie. Nie potrzebujesz klastra MSK, aby korzystać z MSK Connect. Funkcja ta obsługuje klastry kompatybilne z Amazon MSK, Apache Kafka i Apache Kafka jako źródła i cele. Klastry te mogą być zarządzane samodzielnie lub zarządzane przez partnerów AWS i strony trzecie, o ile MSK Connect może prywatnie łączyć się z klastrami.

Korzystanie z MSK Connect z Amazon Aurora i Debezium

Aby przetestować MSK Connect, chcemy go użyć do strumieniowego przesyłania zdarzeń zmiany danych z jednej z naszych baz danych. Aby to zrobić, używamy Debezium, rozproszonej platformy open source do przechwytywania danych o zmianach, zbudowanej na bazie Apache Kafka.

Używamy bazy danych Amazon Aurora zgodnej z MySQL jako źródła i konektora Debezium MySQL z konfiguracją opisaną w tym diagramie architektonicznym:

Aurora Debezium

Aby używać naszej bazy danych Aurora z Debezium, musimy włączyć logowanie binarne w grupie parametrów klastra DB. Wykonujemy kroki opisane w artykule How do I turn on binary logging for my Amazon Aurora MySQL cluster.

Następnie musimy stworzyć niestandardową wtyczkę do MSK Connect. Wtyczka niestandardowa to zestaw plików JAR, które zawierają implementację jednego lub więcej konektorów, przekształceń lub konwerterów. Amazon MSK zainstaluje wtyczkę na urządzeniach roboczych klastra, w którym działa konektor.

Ze strony Debezium pobieramy wtyczkę konektora MySQL w najnowszej stabilnej wersji. Ponieważ MSK Connect akceptuje niestandardowe wtyczki w formacie ZIP lub JAR, konwertujemy pobrane archiwum do formatu ZIP i przechowujemy pliki JAR w głównym katalogu:

$ tar xzf debezium-connector-mysql-1.6.1.Final-plugin.tar.gz

$ cd debezium-connector-mysql

$ip z -9 ../debezium-connector-mysql-1.6.1.zip *

$ cd ..

Następnie używamy interfejsu AWS Command Line Interface (CLI), aby przesłać niestandardową wtyczkę do kontenera Simple Storage Service (Amazon S3) w tym samym regionie AWS, którego używamy dla MSK Connect:

$ aws s3 cp debezium-connector-mysql-1.6.1.zip s3://my-bucket/path/

W konsoli Amazon MSK pojawiła się nowa sekcja MSK Connect. Patrzymy na konektory i wybieramy Create connector. Następnie tworzymy niestandardową wtyczkę i przeglądamy nasze buckety S3, aby wybrać plik ZIP niestandardowej wtyczki, który przesłaliśmy wcześniej.

custom plugin

Wprowadzamy nazwę i opis wtyczki, a następnie wybieramy Next.

custom plugin name

Teraz, gdy konfiguracja wtyczki niestandardowej jest zakończona, rozpoczynamy tworzenie konektora. Podajemy nazwę i opis konektora.

connector properties

Mamy możliwość użycia samodzielnie zarządzanego klastra Apache Kafka lub takiego, który jest zarządzany przez MSK. Wybieramy jeden z klastrów MSK, który jest skonfigurowany do korzystania z uwierzytelniania IAM. Wybrany klaster MSK znajduje się w tej samej wirtualnej chmurze prywatnej (VPC), co baza danych Aurora. Aby nawiązać połączenie, klaster MSK i baza danych Aurora używają domyślnej grupy zabezpieczeń (default security group) dla VPC. Dla uproszczenia używamy konfiguracji klastra z ustawieniem auto.create.topics.enable na true.

Apache Kafka Cluster

W Connector configuration używamy następujących ustawień:

 

 

 

connector.class=io.debezium.connector.mysql.MySqlConnector

tasks.max=1

database.hostname=<aurora-database-writer-instance-endpoint>

database.port=3306

database.user=my-database-user

database.password=my-secret-password

database.server.id=123456

database.server.name=ecommerce-server

database.include.list=ecommerce

database.history.kafka.topic=dbhistory.ecommerce

database.history.kafka.bootstrap.servers=<bootstrap servers>

database.history.consumer.security.protocol=SASL_SSL

database.history.consumer.sasl.mechanism=AWS_MSK_IAM

database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

database.history.producer.security.protocol=SASL_SSL

database.history.producer.sasl.mechanism=AWS_MSK_IAM

database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

include.schema.changes=true

Niektóre z tych ustawień są ogólne i należy je określić dla dowolnego konektora. Na przykład:

  • class to klasa Java konektora.
  • max to maksymalna liczba zadań, które mogą zostać utworzone dla tego konektora.

Inne ustawienia są specyficzne dla konektora Debezium MySQL:

  • hostname zawiera punkt końcowy instancji zapisu w naszej bazie danych Aurora.
  • server.name to logiczna nazwa serwera bazy danych. Jest używany do nazw tematów Kafka stworzonych przez Debezium.
  • include.list zawiera listę baz danych obsługiwanych przez określony serwer.
  • history.kafka.topic to temat Kafka używany wewnętrznie przez Debezium do śledzenia zmian w schemacie bazy danych.
  • history.kafka.bootstrap.servers zawiera serwery typu bootstrap klastra MSK.
  • Ostatnie osiem wierszy (history.consumer.* i database.history.producer.*) umożliwia uwierzytelnianie uprawnień IAM w celu uzyskania dostępu do tematu historii bazy danych.

W Connector capacity możemy wybrać pomiędzy pojemnością autoskalowaną lub stałą.
W przypadku tej konfiguracji wybieramy Autoscaled i pozostawiamy wszystkie inne ustawienia domyślne.

connector capacity

Dzięki autoskalowaniu pojemności możemy skonfigurować te parametry:

  • MSK Connect Unit (MCU) count per worker — każda jednostka MCU zapewnia 1 vCPU mocy obliczeniowej i 4 GB pamięci.
  • Minimalna i maksymalna liczba workerów.
  • Progi wykorzystania autoskalowania — górny i dolny próg wykorzystania docelowego dla zużycia MCU w procentach, aby wyzwolić automatyczne skalowanie.

workers

W Worker configuration możesz użyć domyślnej konfiguracji dostarczonej przez Amazon MSK lub podać własną konfigurację. W naszej konfiguracji używamy domyślnej.

W Access permissions tworzymy rolę IAM. W zaufanych podmiotach dodajemy kafkaconnect.amazonaws.com, aby MSK Connect przejął tę rolę.

Rola jest używana przez MSK Connect do interakcji z klastrem MSK i innymi usługami AWS. Do naszej konfiguracji dodajemy:

Konektor Debezium potrzebuje dostępu do konfiguracji klastra, aby znaleźć czynnik replikacji (replication factor), który ma zostać użyty do utworzenia historii tematu. Z tego powodu dodajemy do polityki uprawnień akcję kafka-cluster:DescribeClusterDynamicConfiguration (odpowiednik ACL klastra Apache Kafka DESCRIBE_CONFIGS).

W zależności od konfiguracji może być konieczne dodanie większej liczby uprawnień do roli (na przykład w przypadku, gdy konektor potrzebuje dostępu do innych zasobów AWS, takich jak bucket S3). W takim przypadku należy dodać uprawnienia przed utworzeniem konektora.

W Security ustawienia uwierzytelniania i szyfrowania podczas przesyłania są pobierane z klastra MSK.

security

W Logs wybieramy dostarczanie logów do CloudWatch Logs, aby uzyskać więcej informacji o wykonaniu konektora. Korzystając z CloudWatch Logs, możemy łatwo zarządzać przechowywaniem oraz interaktywnie wyszukiwać i analizować dane z logów za pomocą CloudWatch Logs Insights. Wprowadzamy grupę logów ARN (jest to ta sama grupa logów, której  wcześniej używaliśmy w roli IAM), a następnie wybieramy Next.

CloudWatch Logs

Przeglądamy ustawienia, a następnie wybieramy opcję Create connector. Po kilku minutach konektor jest uruchomiony.

Testowanie MSK Connect z Amazon Aurora i Debezium

Teraz przetestujmy architekturę, którą właśnie skonfigurowaliśmy. Uruchamiamy instancję Amazon Elastic Compute Cloud (Amazon EC2), aby zaktualizować bazę danych i uruchamiamy kilku użytkowników Kafka, aby zobaczyć Debezium w akcji. Aby móc połączyć się zarówno z klastrem MSK, jak i bazą danych Aurora, używamy tego samego VPC i przypisujemy grupę zabezpieczeń default. Dodajemy również inną grupę bezpieczeństwa, która daje nam dostęp SSH do instancji.

Pobieramy binarną dystrybucję Apache Kafka i rozpakowujemy archiwum w katalogu domowym:

$ tar xvf kafka_2.13-2.7.1.tgz

Aby użyć IAM do uwierzytelniania w klastrze MSK, postępuj zgodnie z instrukcjami w instrukcji Amazon MSK Developer Guide, aby skonfigurować klientów do kontroli dostępu IAM. Pobieramy najnowszą stabilną wersję Amazon MSK Library for IAM:

W katalogu ~/kafka_2.13-2.7.1/config/ tworzymy plik client-config.properties, aby skonfigurować klienta Kafka do korzystania z uwierzytelniania IAM:

# Ustawia TLS dla szyfrowania i SASL dla authN.

security.protocol = SASL_SSL

 

# Identyfikuje używany mechanizm SASL.

sasl.mechanism = AWS_MSK_IAM

 

# Powiązuje implementację klienta SASL.

sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;

 

# Hermetyzuje tworzenie podpisu SigV4 na podstawie wyodrębnionych poświadczeń.

# Klient SASL powiązany przez "sasl.jaas.config" wywołuje tę klasę.

sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler

Dodajemy kilka linijek do naszego profilu Bash, aby:

  • Dodać pliki binarne Kafka do PATH.
  • Dodać bibliotekę MSK dla uprawnień IAM do CLASSPATH.
  • Utworzyć zmienną środowiskową BOOTSTRAP_SERVERS do przechowywania serwerów typu bootstrap naszego klastra MSK.

Bash

$ cat >> ~./bash_profile

export PATH=~/kafka_2.13-2.7.1/bin:$PATH

export CLASSPATH=/home/ec2-user/aws-msk-iam-auth-1.1.0-all.jar

export BOOTSTRAP_SERVERS=<bootstrap servers>

Następnie otwieramy trzy połączenia terminalowe z instancją.

W pierwszym terminalu uruchamiamy konsumenta Kafka dla tematu o tej samej nazwie co serwer bazy danych (ecommerce-server). Ten temat jest używany przez Debezium do przesyłania strumieniowego zmian schematu (na przykład podczas tworzenia nowej tabeli).

$ cd ~/kafka_2.13-2.7.1/

$ kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVERS \

                            --consumer.config config/client-config.properties \

                            --topic ecommerce-server --from-beginning

W drugim terminalu uruchamiamy kolejnego konsumenta Kafka dla tematu o nazwie zbudowanej przez połączenie serwera bazy danych (ecommerce-server), bazy danych (ecommerce) i tabeli (orders). Ten temat jest używany przez Debezium do przesyłania strumieniowego zmian danych dla tabeli (na przykład po wstawieniu nowego rekordu).

$ cd ~/kafka_2.13-2.7.1/

$ kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVERS \

                            --consumer.config config/client-config.properties \

                            --topic ecommerce-server.ecommerce.orders --from-beginning

W trzecim terminalu instalujemy klienta MySQL za pomocą pakietu MariaDB i łączymy się z bazą danych Aurora:

$ sudo yum install mariadb

$ mysql -h <aurora-database-writer-instance-endpoint> -u <database-user> -p

Z tego połączenia tworzymy bazę danych ecommerce i tabelę dla naszej tabeli orders:

CREATE DATABASE ecommerce;

USE ecommerce

CREATE TABLE orders (

       order_id VARCHAR(255),

       customer_id VARCHAR(255),

       item_description VARCHAR(255),

       price DECIMAL(6,2),

       order_date DATETIME DEFAULT CURRENT_TIMESTAMP

);

Te zmiany bazy danych są przechwytywane przez konektor Debezium zarządzany przez MSK Connect i przesyłane strumieniowo do klastra MSK. W pierwszym terminalu widzimy informację o tworzeniu bazy danych i tabeli:

Struct{source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202831473,db=ecommerce,server_id=1980402433,file=mysql-bin-changelog.000003,pos=9828,row=0},databaseName=ecommerce,ddl=CREATE DATABASE ecommerce,tableChanges=[]}

Struct{source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202878811,db=ecommerce,table=orders,server_id=1980402433,file=mysql-bin-changelog.000003,pos=10002,row=0},databaseName=ecommerce,ddl=CREATE TABLE orders ( order_id VARCHAR(255), customer_id VARCHAR(255), item_description VARCHAR(255), price DECIMAL(6,2), order_date DATETIME DEFAULT CURRENT_TIMESTAMP ),tableChanges=[Struct{type=CREATE,id="ecommerce"."orders",table=Struct{defaultCharsetName=latin1,primaryKeyColumnNames=[],columns=[Struct{name=order_id,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=latin1,length=255,position=1,optional=true,autoIncremented=false,generated=false}, Struct{name=customer_id,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=latin1,length=255,position=2,optional=true,autoIncremented=false,generated=false}, Struct{name=item_description,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=latin1,length=255,position=3,optional=true,autoIncremented=false,generated=false}, Struct{name=price,jdbcType=3,typeName=DECIMAL,typeExpression=DECIMAL,length=6,scale=2,position=4,optional=true,autoIncremented=false,generated=false}, Struct{name=order_date,jdbcType=93,typeName=DATETIME,typeExpression=DATETIME,position=5,optional=true,autoIncremented=false,generated=false}]}}]}

Następnie wracamy do połączenia z bazą danych w trzecim terminalu, aby wstawić kilka rekordów w tabeli orders:

SQL

INSERT INTO orders VALUES ("123456", "123", "A super noisy mechanical keyboard", "50.00", "2021-08-16 10:11:12");

INSERT INTO orders VALUES ("123457", "123", "An extremely wide monitor", "500.00", "2021-08-16 11:12:13");

INSERT INTO orders VALUES ("123458", "123", "A too sensible microphone", "150.00", "2021-08-16 12:13:14");

W drugim terminalu widzimy informacje o rekordach wstawionych do tabeli orders:

Struct{after=Struct{order_id=123456,customer_id=123,item_description=A super noisy mechanical keyboard,price=50.00,order_date=1629108672000},source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202993000,db=ecommerce,table=orders,server_id=1980402433,file=mysql-bin-changelog.000003,pos=10464,row=0},op=c,ts_ms=1629202993614}

Struct{after=Struct{order_id=123457,customer_id=123,item_description=An extremely wide monitor,price=500.00,order_date=1629112333000},source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202993000,db=ecommerce,table=orders,server_id=1980402433,file=mysql-bin-changelog.000003,pos=10793,row=0},op=c,ts_ms=1629202993621}

Struct{after=Struct{order_id=123458,customer_id=123,item_description=A too sensible microphone,price=150.00,order_date=1629115994000},source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202993000,db=ecommerce,table=orders,server_id=1980402433,file=mysql-bin-changelog.000003,pos=11114,row=0},op=c,ts_ms=1629202993630}

Nasza architektura przechwytywania danych zmian jest już uruchomiona, a konektor jest w pełni zarządzany przez MSK Connect.

Dostępność i cennik

MSK Connect jest dostępny w następujących regionach AWS: Azja i Pacyfik (Mumbai), Azja i Pacyfik (Seul), Azja i Pacyfik (Singapur), Azja i Pacyfik (Sydney), Azja i Pacyfik (Tokio), Kanada (Centralna), UE (Frankfurt), UE (Irlandia), UE (Londyn), UE (Paryż), UE (Sztokholm), Ameryka Południowa (Sao Paulo), Wschodnie USA (Płn. Wirginia), Wschodnie USA (Ohio), Zachodnie USA (Płn. Kalifornia), USA Zachód (Oregon). Aby uzyskać więcej informacji, zobacz listę AWS Regional Services List.

Z MSK Connect płacisz za to, z czego korzystasz. Zasoby używane przez konektory mogą być skalowane automatycznie na podstawie obciążenia. Aby uzyskać więcej informacji, zobacz stronę z cennikiem Amazon MSK.

źródło: AWS

Case Studies
Referencje

Firma Hostersi pozwoliła nam osadzić ogólne zagadnienia programu Well Architected Framework w kontekście naszej firmy. Oszczędziło nam to wiele czasu i pozwoliło znaleźć lepiej dopasowane rozwiązania do specyfiki naszego biznesu. WAF był świetnym katalizatorem do wprowadzenie szeregu zmian w obszarze niezawodności, szybkości i bezpieczeństwa edrone. 

Piotr Stachowicz
CTO
W skrócie o nas
Specjalizujemy się w dostarczaniu rozwiązań IT w obszarach projektowania infrastruktury serwerowej, wdrażania chmury obliczeniowej, opieki administracyjnej i bezpieczeństwa danych.