Przesyłaj strumieniowo dane za pomocą Amazon DocumentDB, Amazon MSK Serverless i Amazon MSK Connect

16 czerwca 2023

Przesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect

Powszechnym trendem w tworzeniu nowoczesnych aplikacji i przetwarzaniu danych jest wykorzystanie Apache Kafka jako standardowego mechanizmu dostarczania potoków danych i podejścia fan-out. Amazon Managed Streaming for Apache Kafka (Amazon MSK) to w pełni zarządzana, wysoce dostępna i bezpieczna usługa, która ułatwia programistom i managerom DevOps uruchamianie aplikacji na Apache Kafka w chmurze AWS bez konieczności posiadania wiedzy z zakresu zarządzania infrastrukturą Apache Kafka.

Bazy danych dokumentów, takie jak Amazon DocumentDB (z kompatybilnością z MongoDB), są coraz częściej używane, ponieważ programiści i właściciele aplikacji preferują elastyczność schematu JSON bez schematu w nowoczesnych aplikacjach. Amazon DocumentDB to skalowalna, trwała iw pełni zarządzana usługa bazy danych do obsługi krytycznych obciążeń MongoDB. Coraz częściej klienci używają Amazon MSK z Amazon DocumentDB do różnych zastosowań.

Przy pomocy tego artykułu autorzy prezentują, jak uruchomić i skonfigurować łącznik open-source MongoDB Kafka do przenoszenia danych między Amazon MSK i Amazon DocumentDB.

Amazon DocumentDB może działać zarówno jako ujście danych, jak i źródło danych dla Amazon MSK w różnych przypadkach użycia.

Amazon DocumentDB jako ujście danych

Poniżej przedstawiono przykładowe przypadki użycia, w których można użyć Amazon DocumentDB jako ujścia danych za Amazon MSK:

  • Strumieniowe przesyłanie danych do transmisji strumieniowej wideo na żywo lub wydarzeń flash sale: W przypadku dużej transmisji strumieniowej wideo lub wydarzenia flash sale generowane duże ilości danych dotyczących reakcji widzów lub strumienia kliknięć kupującego mogą być przekazywane do Amazon MSK jako nieprzetworzone dane. Możesz dalej przesyłać strumieniowo te dane do Amazon DocumentDB w celu dalszego przetwarzania i agregacji.
  • Przesyłanie strumieniowe danych telemetrycznych z urządzeń IoT lub danych o trafieniach w witrynach internetowych: W przypadku przesyłania strumieniowego danych telemetrycznych z urządzeń Internetu rzeczy (IoT), danych o trafieniach w witrynach internetowych lub danych meteorologicznych dane można przesyłać strumieniowo do Amazon DocumentDB za pomocą łącznika, a następnie przetwarzać (np. jako agregacja lub obliczenie min/max).
  • Odtwarzanie nagrań lub odzyskiwanie aplikacji w Amazon DocumentDB: W klastrze Amazon DocumentDB, zamiast przywracania całej kopii zapasowej, aplikacja może odtwarzać określone zmiany na poziomie elementów z Amazon MSK do kolekcji Amazon DocumentDB.

Amazon DocumentDB jako źródło danych

Poniżej przedstawiono przykładowe przypadki użycia, w których można wysyłać strumienie zmian Amazon DocumentDB do Amazon MSK:

  • Replikacja danych do innego klastra Amazon DocumentDB lub magazynów danych: Amazon MSK może być używany jako warstwa pośrednia do selektywnej replikacji kolekcji z jednego klastra Amazon DocumentDB do innego klastra lub innych magazynów danych.
  • Przenoszenie danych na potrzeby zaawansowanych analiz i uczenia maszynowego: Amazon DocumentDB oferuje bogate środowisko agregacji, ale w przypadku zaawansowanych analiz i uczenia maszynowego (ML) można utworzyć potok danych z Amazon DocumentDB do różnych innych magazynów danych. Możesz użyć Amazon MSK jako warstwy pośredniej do modyfikowania i filtrowania zdarzeń zmian przed załadowaniem ich do docelowego magazynu danych.

Złącze MongoDB Kafka może działać w obu przypadkach do przesyłania danych między Amazon DocumentDB i Amazon MSK.

Omówienie rozwiązania

MSK Connect to funkcja Amazon MSK, która ułatwia wdrażanie, monitorowanie i automatyczne skalowanie łączników, które przenoszą dane między klastrami Apache Kafka a systemami zewnętrznymi, takimi jak magazyny danych, jak Amazon DocumentDB, systemy plików i indeksy wyszukiwania.

W tym artykule autorzy korzystają z łącznika MongoDB Kafka działającego w MSK Connect, aby przenieść zmiany do i z Amazon DocumentDB do Amazon MSK.

Dzięki MSK Connect nie musisz udostępniać infrastruktury, aby uruchamiać łączniki. MSK Connect zapewnia środowisko bezserwerowe i skaluje liczbę procesów roboczych w górę i w dół, więc nie musisz udostępniać serwerów ani klastrów, a płacisz tylko za to, co jest potrzebne do przenoszenia danych przesyłanych strumieniowo do i z klastra MSK Kafka. Dzięki opcji automatycznego skalowania, którą oferuje MSK Connect, skaluje pracowników w zależności od wykorzystania procesora przez obciążenia.

Poniższy artykuł autorzy podzielil na dwie główne sekcje:

  • Amazon DocumentDB jako ujście – w pierwszej części tego artykułu omawióne zostaje dostarczanie danych do Amazon DocumentDB przez Amazon MSK przy użyciu łącznika.
  • Amazon DocumentDB jako źródło – w drugiej części tekstu autorzy omawiają pobieranie danych z Amazon DocumentDB przy użyciu tego samego łącznika i publikowanie ich w temacie Kafki dla dalszego konsumenta Kafki.

Poniższy diagram ilustruje architekturę i przepływ danych:

Przesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect

Wymagania wstępne

Aby móc śledzić omawiane zmiany w artykule, potrzebujesz następujących zasobów i konfiguracji:

  • Klaster Amazon DocumentDB.
  • Bezserwerowy klaster MSK.
  • Instancja Amazon Elastic Compute Cloud (Amazon EC2) ze skonfigurowaną powłoką Mongo i Javą.
  • Zasobnik Amazon Simple Storage Service (Amazon S3) do przechowywania wtyczki złącza i pliku zaufanych certyfikatów JVM.
  • Niestandardowa wtyczka wykorzystująca złącze MongoDB Kafka i dostawców konfiguracji Amazon MSK.
  • Zasady i role zarządzane przez klienta dla MSK Connect.
  • Rola dla instancji EC2.
  • Trust store dla JVM do łączenia Amazon DocumentDB z MSK Connect.
  • Punkty końcowe bramy dla MSK Connect w celu uzyskania dostępu Trust Store na Amazon S3.

Poniesiesz koszty na swoim koncie związane z zasobami Amazon DocumentDB, Amazon MSK i Amazon EC2. Możesz użyć kalkulatora cen AWS, aby oszacować koszt na podstawie swojej konfiguracji.

Wykonaj czynności opisane w tej sekcji, aby utworzyć omawiane zasoby.

Klaster Amazon DocumentDB

Możesz użyć istniejącego klastra opartego na instancjach lub utworzyć nowy klaster instancji Amazon DocumentDB.

Przesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect

Możesz także użyć elastycznego klastra Amazon DocumentDB dla przypadku użycia Sink.

Klaster Amazon MSK

Możesz użyć istniejącego klastra MSK Serverless lub utworzyć nowy klaster MSK Serverless za pomocą opcji szybkiego tworzenia. Klaster powinien zostać wdrożony w tym samym VPC, co klaster Amazon DocumentDB i skonfigurowany z tą samą grupą zabezpieczeń, która jest używana w Amazon DocumentDB. Twój klaster powinien mieć również następujące konfiguracje:

Przesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect

W przypadku klastrów MSK Serverless uwierzytelnianie oparte na rolach IAM jest ustawieniem domyślnym. W przypadku uwierzytelniania opartego na rolach IAM protokół TLS jest włączany automatycznie.

Instancja Amazon EC2 z Mongo Shell i skonfigurowaną Javą

Możesz wybrać istniejącą instancję Amazon EC2 lub skonfigurować nową. Autorzy używają instancji EC2 do celów testowych. Twoja instancja powinna mieć następujące konfiguracje:

  1. Wdróż instancję w tym samym VPC klastra Amazon DocumentDB i klastra MSK z tą samą grupą zabezpieczeń.
  2. Skonfiguruj grupę zabezpieczeń instancji do łączenia się z klastrem MSK (port 9098) i klastrem Amazon DocumentDB (port 27017) oraz z klastra MSK
  3. Musisz zainstalować powłokę mongo na instancji EC2. Aby uzyskać instrukcje, zobacz Instalowanie powłoki mongo.
  4. Zainstaluj Javę na instancji EC2:

sudo yum install java-11-amazon-corretto-headless -y

Zasobnik Amazon S3

Będziesz potrzebować zasobnika Amazon S3 do przechowywania wtyczki łącznika i pliku zaufanych certyfikatów JVM. Możesz użyć istniejącego zasobnika S3 lub utworzyć nowy zasobnik. Musisz upewnić się, że zasady dostępu do zasobnika S3 są prawidłowo skonfigurowane w następujący sposób. Zaktualizuj nazwę zasobnika Amazon S3 i vpc_id (gdzie utworzyłeś Amazon MSK, Amazon DocumentDB) w zasadach. Dzięki zasadom zasobników Amazon S3 możesz zabezpieczyć dostęp do obiektów w swoich zasobnikach, tak aby dostęp do nich mieli tylko użytkownicy/zasoby z odpowiednimi uprawnieniami.

 

{
    "Version": "2012-10-17",
    "Id": "Access-to-bucket-using-specific-VPC",
    "Statement": [
        {
            "Sid": "Access-to-specific-VPC-only",
            "Effect": "Allow",
            "Principal": "*",
            "Action": "s3:*",
            "Resource": [
                "arn:aws:s3:::<Amazon S3 Bucket>",
                "arn:aws:s3:::<Amazon S3 Bucket>/*"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:sourceVpc": "<vpc-id>"
                }
            }
        }
    ]
} 

Utwórz niestandardową wtyczkę za pomocą łącznika MongoDB Kafka

Wtyczka zawiera kod, który definiuje logikę łącznika. Musisz utworzyć niestandardową wtyczkę w Amazon MSK przy użyciu złącza Mongodb Kafka. Podczas późniejszego tworzenia łącznika MSK Connect należy go określić.

Dostawcy konfiguracji Apache Kafka integrują Twoje złącze z innymi systemami, takimi jak Amazon S3 do przechowywania pliku zaufanych certyfikatów, AWS Systems Manager Parameter Store do przechowywania hasła do zaufanego magazynu oraz AWS Secrets Manager do przechowywania nazwy użytkownika Amazon DocumentDB, hasła i innych poświadczeń.

W przypadku tego artykułu będziesz przechowywać złącze mongodb kafka i certyfikat magazynu zaufanych certyfikatów w zasobniku Amazon S3, które utworzyłeś w poprzednim kroku. Potrzebujesz dostawców konfiguracji, aby uzyskać dostęp do Amazon S3 z MSK connect.

Otwórz terminal, zaloguj się do instancji EC2 i wykonaj następujące kroki:

  1. Utwórz strukturę katalogów w następujący sposób:
docdb-connector
├── mongo-connector
│ └── <MONGODB-CONNECTOR-ALL>.jar
├── msk-config-providers
│ └── <MSK CONFIG PROVIDERS>
mkdir -p ~/docdb-connector
mkdir -p ~/docdb-connector/mongo-connector
mkdir -p ~/docdb-connector/msk-config-providers

Skopiuj plik JAR łącznika do katalogu ~/docdb-connector/mongo-connector i plik .zip dostawcy konfiguracji MSK do katalogu ~/docdb-connector/msk-config-providers.

Pobierz złącze MongoDB Kafka JAR v. 1.10 lub nowszy z GitHub:

cd ~/docdb-connector/mongo-connector
wget https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.10.0/mongo-kafka-connect-1.10.0-all.jar

Pobierz plik .zip dostawcy konfiguracji MSK i rozpakuj go:

cd ~/docdb-connector/msk-config-providers
wget https://github.com/aws-samples/msk-config-providers/releases/download/r0.1.0/msk-config-providers-0.1.0-with-dependencies.zip
unzip msk-config-providers-0.1.0-with-dependencies.zip
rm msk-config-providers-0.1.0-with-dependencies.zip

Połącz oba pliki JAR i utwórz plik .zip:

cd ~;zip -r docdb-connector-plugin.zip docdb-connector

Przed utworzeniem niestandardowej wtyczki MSK prześlij plik docdb-connector-plugin.zip do zasobnika S3 utworzonego w poprzednim kroku. Możesz przesłać go z wiersza poleceń (zobacz poniższy kod) lub za pomocą konsoli Amazon S3.

cd ~;aws s3 cp docdb-connector-plugin.zip s3://<Amazon S3 Bucket>;

Teraz możesz utworzyć niestandardową wtyczkę dla MSK Connect, wykonując następujące czynności:

  1. W konsoli Amazon MSK wybierz Custom plugins w panelu nawigacyjnym i wybierz Create custom plugin.
  2. Podaj identyfikator URI S3, do którego została przesłana wtyczka łącznika.
  3. Wprowadź nazwę wtyczki.
  4. Wybierz Create custom plugin.

Przesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect

docdb-connector-plugin będzie aktywny i gotowy do utworzenia konektora.

Utwórz zarządzane przez klienta zasady i role dla MSK Connect

Utwórz zasady zarządzane przez klienta, aby uzyskać dostęp do klastra MSK Serverless z instancji MSK Connect i EC2. Zaktualizuj region i identyfikator konta w zasadach. Region powinien być taki sam, jak ten, w którym udostępniono klaster Amazon DocumentDB, klaster MSK i instancję EC2.

 

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "kafka-cluster:*",
            "Resource": "arn:aws:kafka:::*/*/*"
        }
    ]
}

Teraz utwórz rolę IAM z poprzednią zasadą, a także dołącz do tej roli zasady dostępu tylko do odczytu Amazon S3 zarządzane przez AWS (ponieważ MSK Connect musi uzyskać dostęp do certyfikatu magazynu zaufanych certyfikatów Amazon DocumentDB z Amazon S3).

Przesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect 5

Zastąp/Dodaj następującą zasadę zaufania do roli IAM, aby usługa MSK Connect mogła ją przyjąć:

 

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "kafkaconnect.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

Przesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect

Utwórz rolę dla instancji EC2

Użyj powyższej polityki zarządzanej przez klienta dla MSK Serverless Cluster, aby uzyskać dostęp do MSK z instancji EC2 i utworzyć rolę IAM i przypisać ją do instancji EC2. Do celów testowych autorzy wykorzystali instancję EC2.

Przesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect

Utwórz magazyn zaufanych certyfikatów dla JVM

Klaster Amazon DocumentDB jest domyślnie włączony SSL/TLS, a łącznik Kafka działa z wirtualną maszyną Java (JVM), więc musisz utworzyć magazyn zaufanych certyfikatów z hasłem. Aby uzyskać instrukcje, zapoznaj się z częścią Connecting Programmatically to Amazon DocumentDB. Utwórz katalog lokalny i skopiuj plik magazynu zaufanych certyfikatów (rds-truststore.jks). Jeśli wykonałeś kroki, aby poprawnie utworzyć magazyn zaufanych certyfikatów, plik będzie zlokalizowany w /tmp/certs.

Skopiuj plik magazynu zaufanych certyfikatów do zasobnika S3; łącznik używa tego pliku do łączenia się z Amazon DocumentDB. Możesz użyć tego samego zasobnika S3, w którym przechowywałeś wtyczkę złącza. Zobacz następujący kod:

cp /tmp/certs/rds-truststore.jks ~
cd ~;aws s3 cp rds-truststore.jks s3://<Amazon S3 Bucket>

Utwórz punkty końcowe bramy dla usługi Amazon S3, aby uzyskać dostęp do magazynu zaufanych certyfikatów

Ponieważ magazyn zaufanych certyfikatów jest przechowywany w Amazon S3, musisz skonfigurować punkt końcowy VPC bramy dla Amazon S3, aby łącznik mógł pobierać zaufany magazyn z Amazon S3.

Amazon DocumentDB jako ujście

W tej części artykułu autorzy skupią się na przypadku użycia ujścia, jak pokazano na poniższym diagramie. Omówią, jak stworzyć i uruchomić konektor (za pomocą MSK Connect) oraz użyć Amazon DocumentDB jako bazy danych sink do przenoszenia danych z tematu MSK Kafka, który jest generowany przez producenta Kafki.

Przesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect

Kroki konfiguracji są następujące:

  1. Skonfiguruj złącze jako złącze ujścia Amazon DocumentDB.
  2. Przetestuj złącze MongoDB Kafka z Amazon DocumentDB jako ujście.

Skonfiguruj łącznik jako łącznik ujścia usługi Amazon DocumentDB

Wykonaj następujące kroki:

  1. W konsoli Amazon MSK wybierz Connectors w okienku nawigacyjnym i wybierz Create connector.Przesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect
  2. Wybierz niestandardową wtyczkę utworzoną w ramach wymagań wstępnych, a następnie wybierz Next.Przesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect
  3. Podaj nazwę konektora w informacjach podstawowych.
  4. Wybierz klaster MSK Serverless z uwierzytelnianiem IAM.Przesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect
  5. W konfiguracji złącza wprowadź następujące konfiguracje. Zaktualizuj nazwę logowania Amazon DocumentDB, hasło, punkt końcowy klastra, port klastra, nazwę regionu, nazwę zasobnika S3 i hasło magazynu zaufanych certyfikatów.
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
topics=sink-topic
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.tolerance=all
# Connection String with Plain text secrets and cluster domain details:
connection.uri=mongodb://<docdbloginname>:<docdbpassword>@<docdbclusterendpoint>:<docdbportnumber>/?ssl=true&readPreference=secondaryPreferred&retryWrites=false
# Connection String with usage of AWS Secrets Manager:
#connection.uri=mongodb://${sm:/docdb/db1:username}:${sm:/docdb/db1:password}@${sm:/docdb/db1:host}:${sm:/docdb/db1:port}/?ssl=true&retryWrites=false
database=sinkdatabase
collection=sinkcollection
connection.ssl.truststore=${s3import:<regionname>:<s3-bucket-name>/rds-truststore.jks}
# Truststore password in PLAIN view:
connection.ssl.truststorePassword=<truststore_password>
# Truststore password using AWS System Manager Parameter Store:
#connection.ssl.truststorePassword=${ssm::/docdb/truststorePassword/caCertPass}
config.providers= s3import,ssm,sm
config.providers.s3import.class=com.amazonaws.kafka.config.providers.S3ImportConfigProvider
config.providers.s3import.param.region=<regionname>
#config.providers.ssm.class=com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
#config.providers.ssm.param.region=<regionname>
#config.providers.sm.class=com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
#config.providers.sm.param.region=<regionname>

Konfiguracja zawiera następujące szczegóły:

  • connector.class – klasa Java dla łącznika. Jest to klasa odpowiedzialna za przenoszenie danych z Kafki.
  • task.max – maksymalna liczba zadań, które należy utworzyć dla tego łącznika.
  • topics – lista tematów Kafki obserwowanych przez ten łącznik ujścia. Nazwa tematu to sink-topic.
  • key.converter – klasa konwertera, która instruuje złącze, jak przetłumaczyć klucz z formatu szeregowego Kafki. Użyj konwertera klas ciągów.
  • value.converter – klasa konwertera, która instruuje złącze, jak przetłumaczyć wartość z formatu serializowanego Kafki. Masz dane JSON w swoim temacie Kafka, więc konfiguruj Kafka Connect, aby używał konwertera JSON.
  • value.converter.schemas.enable – domyślnie konwerter JSON będzie oczekiwał schematu JSON, ale ustawiono go jako false, ponieważ nie ma żadnego schematu.
  • connection-uri – Definiuje punkt końcowy do połączenia z klastrem Amazon DocumentDB. Autorzy używają punktu końcowego z opcją SSL. Należy zauważyć, że informacje o klastrze Amazon DocumentDB są przechowywane w AWS Secrets Manager zamiast zwykłego tekstu i dynamicznie pobierane podczas tworzenia konektora lub tworzenia i odzyskiwania zadania. Aby uzyskać więcej informacji, zobacz Znajdowanie punktów końcowych klastra.
  • database – docelowa baza danych Amazon DocumentDB. Użyj nazwy bazy danych sinkdb.
  • collection – nazwa kolekcji w bazie danych, do której mają zostać wprowadzone zmiany. Nazwa kolekcji to sinkcollection.
  • connection.ssl.truststore – określa lokalizację pliku magazynu zaufanych certyfikatów Amazon DocumentDB. Jest zdefiniowany jako format S3 URI z zasobnikiem i nazwą pliku.
  • connection.ssl.truststorePassword – w tym miejscu należy podać hasło magazynu zaufanych certyfikatów w postaci zwykłego tekstu. Możesz także umieścić hasło w magazynie parametrów i zdefiniować dostawców konfiguracji.

 

config.providers – aby zintegrować złącze Kafka z innymi systemami, takimi jak Amazon S3 do przechowywania pliku zaufanych certyfikatów, Parameter Store do przechowywania hasła do zaufanego magazynu i Secrets Manager do przechowywania nazwy użytkownika Amazon DocumentDB, hasła i innych szczegółów, potrzebujesz konfiguracja dostawców. W tym przypadku potrzebujesz tylko dostawcy konfiguracji Amazon S3, aby uzyskać dostęp do magazynu zaufanych certyfikatów.

  • config.providers – nazwa dostawcy konfiguracji. W tym przypadku „s3”.
  • config.providers.s3import.class – klasa Java dostawcy konfiguracji importu S3.
  • config.providers.s3import.param.region – region zasobnika S3 dostawcy konfiguracji.
  1. Wybierz rolę IAM utworzoną w celu uzyskania dostępu do klastra MSK i usługi Amazon S3, a następnie wybierz opcję NextPrzesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect.
  2. Wybierz opcję Deliver to Amazon CloudWatch Logs i wprowadź lokalizację dostarczania dziennika dla łącznika.
  3. Poczekaj, aż stan złącza zmieni się na Running.

Przetestuj złącze MongoDB Kafka z Amazon DocumentDB jako ujście

Aby przetestować łącznik, uruchom producenta Kafki, aby wypchnąć zmiany do tematu Kafki -documentdb_topic. Łącznik Kafka odczytuje szczegóły z tego tematu i umieszcza je w Amazon DocumentDB na podstawie konfiguracji.

  1. Aby uruchomić lokalnego producenta Kafki, musisz zalogować się do swojej instancji EC2 i pobrać dystrybucję binarną Apache Kafka oraz rozpakować archiwum w local_kafka: 
    mkdir ~/local_kafka;cd ~/local_kafka/
    cp /usr/lib/jvm/java-11-amazon-corretto.x86_64/lib/security/cacerts kafka_iam_truststore.jks
    wget https://dlcdn.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz
    tar -xzf kafka_2.13-3.2.3.tgz
    ln -sfn kafka_2.13-3.2.3 kafka
  2. Aby użyć IAM do uwierzytelnienia w klastrze MSK, pobierz bibliotekę Amazon MSK dla IAM i skopiuj do lokalnego katalogu biblioteki Kafka, jak pokazano w poniższym kodzie. Aby uzyskać pełne instrukcje, zobacz Configure clients for IAM access control.
  3. W katalogu ~/local_kafka/kafka/config/ utwórz plik client-config.properties, aby skonfigurować klienta Kafka do używania uwierzytelniania IAM dla producenta i konsumentów konsoli Kafka: 
    ssl.truststore.location=/home/ec2-user/local_kafka/kafka_iam_truststore.jks
    security.protocol=SASL_SSL
    sasl.mechanism=AWS_MSK_IAM
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  4. Zdefiniuj zmienną środowiskową BOOTSTRAP_SERVERS do przechowywania serwerów ładowania początkowego klastra MSK i lokalnie zainstaluj Kafkę w zmiennej środowiskowej path. 
    export BOOTSTRAP_SERVERS=<kafka_bootstarp_serverswithports>
    export PATH=$PATH:/home/ec2-user/local_kafka/kafka_2.13-3.2.3/bin
  5. Utwórz temat sink-topic Kafki, który zdefiniowałeś w konfiguracji konektora: 
    cd ~/local_kafka/kafka/config
    kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVERS --partitions 1 --topic sink-topic --command-config client-config.properties
  6. Uruchom producenta konsoli Kafka, aby zapisał w temacie MSK documentdb_topic i prześlij prawidłowe dokumenty JSON {"name":"DocumentDB NoSQL"} i {"test":"DocumentDB Sink Connector"}: 
    cd ~/local_kafka/kafka/config
    kafka-console-producer.sh --bootstrap-server $BOOTSTRAP_SERVERS --producer.config client-config.properties --topic sink-topic
    {"name":"DocumentDB NoSQL"}
    {"test":"DocumentDB Sink Connector"}
  7. Otwórz drugi terminal i połącz się z klastrem Amazon DocumentDB za pomocą powłoki mongo. Poprzednie dwa dokumenty JSON powinny być częścią kolekcji sinkcollection w sinkdb: 
    use sinkdatabase
    db.sinkcollection.find()

Otrzymasz następujące dane wyjściowe:

 

{ "_id" : ObjectId("62c3cf2ec3d9010274c7a37e"), "name" : "DocumentDB NoSQL" }
{ "_id" : ObjectId("62c3d048c3d9010274c7a37f"), "test" : "DocumentDB Sink Connector" }

Powinieneś zobaczyć dokument JSON, który „wypchnąłeś” za pomocą producenta konsoli.

Amazon DocumentDB jako źródło

W tej sekcji autorzy omówią, jak utworzyć i uruchomić łącznik (przy użyciu kontenerów Docker) z platformą Kafka Connect oraz użyć Amazon DocumentDB jako źródłowej bazy danych do przeniesienia zmian kolekcji do tematu MSK Kafka.

Poniższy diagram ilustruje ten przepływ danych:

Przesylaj strumieniowo dane za pomoca Amazon DocumentDB Amazon MSK Serverless i Amazon MSK Connect

Teraz musisz skonfigurować inny łącznik dla źródłowego przypadku użycia, wykonując następujące czynności:

  1. Skonfiguruj Amazon DocumentDB dla strumienia zmian.
  2. Skonfiguruj łącznik jako łącznik źródłowy Amazon DocumentDB.
  3. Przetestuj złącze MongoDB Kafka z Amazon DocumentDB jako źródłem.

Skonfiguruj Amazon DocumentDB dla strumienia zmian

Łącznik odczytuje zmiany z kolekcji źródłowej za pomocą kursora strumienia zmian. Funkcja strumieni zmian w Amazon DocumentDB zapewnia uporządkowaną w czasie sekwencję zdarzeń zmian, które mają miejsce w Twoich kolekcjach.

 

W tym artykule autorzy używają kolekcji sourcecollection w bazie danych sourcedatabase w naszym klastrze Amazon DocumentDB.

Połącz się z klastrem Amazon DocumentDB i włącz strumień zmian dla kolekcji sourcecollection:

use sourcedatabase
db.createCollection("sourcecollection")
db.adminCommand({modifyChangeStreams: 1,database: "sourcedatabase",collection: "sourcecollection", enable:true})

Skonfiguruj łącznik jako łącznik źródłowy Amazon DocumentDB

Teraz skonfigurujesz łącznik źródłowy do odczytywania zmian w kolekcji Amazon DocumentDB i przechowywania tych zmian w temacie MSK. Łącznik odczytuje te zmiany ze skonfigurowanego przez Ciebie strumienia zmian Amazon DocumentDB.

Kroki tworzenia łącznika źródła Amazon DocumentDB są takie same jak w przypadku łącznika ujścia, z wyjątkiem konfiguracji łącznika.

W przypadku złącza źródłowego wykonaj podobne kroki od kroku 1 do kroku 8 konfiguracji złącza ujścia, ale użyj następujących konfiguracji złącza:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
errors.tolerance=all
connection.uri= mongodb://<docdbloginname>:<docdbpassword>@<docdbclusterendpoint>:<docdbportnumber>/?ssl=true&replicaSet=rs0&retryWrites=false
database= sourcedatabase
collection=sourcecollection
connection.ssl.truststore=${s3import:<regionname>:<Amazon S3 Bucket>/rds-truststore.jks}
connection.ssl.truststorePassword=<truststore_password>
config.providers=s3import,ssm,sm
config.providers.s3import.class=com.amazonaws.kafka.config.providers.S3ImportConfigProvider
config.providers.s3import.param.region=<regionname>
config.providers.ssm.class=com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
config.providers.ssm.param.region=<regionname>
config.providers.sm.class=com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
config.providers.sm.param.region=<regionname>

Konfiguracja zawiera typ łącznika i jego właściwości:

  • connector.class – klasa Java dla konektora. Jest to klasa odpowiedzialna za przeniesienie danych z kolekcji Amazon DocumentDB do tematu MSK.
  • task.max – Maksymalna liczba zadań, które należy utworzyć dla tego łącznika.
  • connection-uri – punkt końcowy Amazon DocumentDB do łączenia się z klastrem Amazon DocumentDB. Użyj punktu końcowego z opcją SSL.
  • database – źródłowa baza danych. W tym przypadku nazwa bazy danych to sourcedatabase.
  • collection – kolekcja w bazie danych do obserwowania zmian. Nazwa kolekcji to sourcecollection.
  • connection.ssl.truststore – określa lokalizację pliku magazynu zaufanych certyfikatów Amazon DocumentDB. Jest zdefiniowany jako format S3 URI z zasobnikiem i nazwą pliku.
  • connection.ssl.truststorePassword – tutaj należy dodać hasło magazynu zaufanych certyfikatów w postaci zwykłego tekstu. Możesz także przechowywać hasło w AWS Systems Manager Parameter Store i definiować dostawców konfiguracji.

Aby zintegrować złącze Kafka z innymi systemami, takimi jak Amazon S3, musisz zdefiniować dostawców konfiguracji.

Należy zauważyć, że connection.uri różni się od poprzedniego przypadku użycia ujścia. Nie uwzględniasz ustawienia preferencji odczytu jako dodatkowego w pliku connection.uri, ponieważ usługa Amazon DocumentDB obsługuje tylko strumień zmian w wystąpieniu podstawowym.

Poczekaj, aż stan złącza źródłowego Amazon DocumentDB zmieni się na Running.

Przetestuj łącznik z Amazon DocumentDB jako źródłem

Aby przetestować łącznik, wstaw dane do kolekcji Amazon DocumentDB. Łącznik Kafka odczytuje wstawione dane przy użyciu strumienia zmian kolekcji i zapisuje je w temacie Kafka.

  1. Otwórz nowy terminal na instancji EC2 i zaimportuj następujące zmienne środowiskowe: 
    export BOOTSTRAP_SERVERS=<kafka_bootstarp_serverswithports>
    export PATH=$PATH:/home/ec2-user/local_kafka/kafka_2.13-3.2.3/bin
  2. Utwórz temat Kafki sourcedatabase.sourcecollection: 
    cd ~/local_kafka/kafka/config
    kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVERS --partitions 1 --topic sourcedatabase.sourcecollection --command-config client-config.properties
  3. Uruchom konsumenta konsoli Kafka, aby odczytać szczegóły z tematu Kafka sourcedatabase.sourcecollection. Jeśli uruchomisz go na nowym terminalu, pamiętaj o utworzeniu zmiennej środowiskowej BOOTSTRAP_SERVERS. 
    cd ~/local_kafka/kafka/config
    kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config client-config.properties --topic sourcedatabase.sourcecollection --from-beginning
  4. W drugim terminalu dodaj rekord w sourcedatabase.sourceCollection klastra Amazon DocumentDB: 
    use sourcedatabase
    db.sourcecollection.insert({"name":"Amazon DocumentDB"})
  5. Wróć do pierwszego terminala, gdzie konsument konsoli czyta z tematu MSK: 
    {"_id": {"_data": "0164263f9e0000000701000000070000426b"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1680228254, "i": 7}}, "ns": {"db": "sourcedatabase", "coll": "sourcecollection"}, "documentKey": {"_id": {"$oid": "64263f9ea715466fe5ff0c9d"}}, "fullDocument": {"_id": {"$oid": "64263f9ea715466fe5ff0c9d"}, "name": "Amazon DocumentDB"}}

Możesz zaobserwować, że operacja wstawiania wykonana na kolekcji Amazon DocumentDB jest dostępna na konsoli konsumenta.

Możesz teraz przechwytywać zmiany w Amazon DocumentDB jako źródłowej bazie danych przy użyciu łącznika MongoDB Kafka, uruchamiając go w MSK Connect.

Porządkowanie

Aby wyczyścić zasoby używane na koncie, usuń je w następującej kolejności:

  • Instancja Amazon EC2
  • Rola IAM i polityka zarządzana przez klienta
  • Punkty końcowe bramy dla Amazon S3
  • Łączniki Amazon MSK
  • Niestandardowa wtyczka Amazon MSK
  • Klaster Amazon MSK Kafka
  • Klaster Amazon DocumentDB

Wnioski

W powyższym artykule autorzy omówili, jak uruchomić i skonfigurować złącze MongoDB Kafka do przenoszenia danych między Amazon DocumentDB i Amazon MSK dla różnych przypadków użycia ujścia i źródła. Możesz używać tego rozwiązania do różnych zastosowań, takich jak tworzenie potoków dla dużych transmisji strumieniowych wideo lub wydarzeń typu flash sale, strumieniowe przesyłanie danych telemetrycznych z urządzeń IoT, zbieranie danych o trafieniach w witrynach internetowych, replikowanie kolekcji z Amazon DocumentDB do innych magazynów danych oraz przenoszenie danych dla zaawansowanej analityki i ML.

Najpierw pokazali, jak używać łącznika do strumieniowego przesyłania danych z Amazon MSK do Amazon DocumentDB, gdzie Amazon DocumentDB działa jak ujście. Zaprezentowali również, jak skonfigurować łącznik w MSK Connect. W drugiej połowie tego posta przedstawili, jak przesyłać strumieniowo dane z Amazon DocumentDB do Amazon MSK, gdzie Amazon DocumentDB działa jako źródło. Omówili również różne konfiguracje dostępne w obu przypadkach użycia, które można dostosować do konkretnego przypadku użycia lub wymagań dotyczących obciążenia.

Źródło: AWS

Case Studies
Referencje

Rekomendujemy firmę Hostersi Sp. z o.o. jako odpowiedzialnego i wykwalifikowanego partnera, dbającego o wysoki poziom obsługi klienta. Zlecenie zostało wykonane profesjonalnie, według najlepszych standardów, w bardzo krótkim czasie.

Paweł Rokicki
Managing Director
W skrócie o nas
Specjalizujemy się w dostarczaniu rozwiązań IT w obszarach projektowania infrastruktury serwerowej, wdrażania chmury obliczeniowej, opieki administracyjnej i bezpieczeństwa danych.