Programování

Vytvořeno pro realtime: Zasílání velkých objemů dat pomocí Apache Kafka, část 1

Když začal pohyb velkých dat, byl většinou zaměřen na dávkové zpracování. Distribuované datové úložiště a nástroje pro dotazování, jako jsou MapReduce, Hive a Pig, byly navrženy tak, aby zpracovávaly data v dávkách, nikoli nepřetržitě. Firmy by každou noc spouštěly více úloh, aby extrahovaly data z databáze, poté je analyzovaly, transformovaly a nakonec uložily. V poslední době podniky objevily sílu analýzy a zpracování dat a událostí jak se stávají, ne jen jednou za několik hodin. Většina tradičních systémů pro zasílání zpráv se však nezvyšuje, aby zvládla velká data v reálném čase. Inženýři společnosti LinkedIn tedy vytvořili a otevřeně vytvořili Apache Kafka: rámec distribuovaného zasílání zpráv, který splňuje požadavky velkých dat škálováním na komoditním hardwaru.

Během několika posledních let se společnost Apache Kafka objevila, aby vyřešila řadu případů použití. V nejjednodušším případě to může být jednoduchá vyrovnávací paměť pro ukládání protokolů aplikací. V kombinaci s technologií, jako je Spark Streaming, ji ​​lze použít ke sledování změn dat a provedení akce s těmito daty před jejich uložením do konečného cíle. Díky prediktivnímu režimu Kafka je výkonným nástrojem pro detekci podvodů, jako je kontrola platnosti transakce kreditní kartou, když k ní dojde, a nečekání na dávkové zpracování o několik hodin později.

Tento dvoudílný kurz představuje Kafku, počínaje instalací a spuštěním ve vývojovém prostředí. Získáte přehled architektury Kafky, následovaný úvodem do vývoje out-of-the-box systému Apache Kafka pro zasílání zpráv. Nakonec vytvoříte vlastní aplikaci výrobce / spotřebitele, která odesílá a spotřebovává zprávy prostřednictvím serveru Kafka. Ve druhé polovině tohoto kurzu se naučíte, jak rozdělit a seskupit zprávy a jak řídit, které zprávy spotřebuje spotřebitel Kafky.

Co je Apache Kafka?

Apache Kafka je systém zasílání zpráv vytvořený v měřítku pro velká data. Podobně jako Apache ActiveMQ nebo RabbitMq umožňuje Kafka aplikacím postaveným na různých platformách komunikovat prostřednictvím asynchronního předávání zpráv. Kafka se ale od těchto tradičnějších systémů pro zasílání zpráv liší zásadními způsoby:

  • Je navržen pro horizontální škálování přidáním dalších komoditních serverů.
  • Poskytuje mnohem vyšší propustnost pro výrobní i spotřebitelské procesy.
  • Lze jej použít k podpoře dávkových i reálných případů použití.
  • Nepodporuje JMS, rozhraní Java pro middleware orientované na zprávy.

Architektura Apache Kafky

Než prozkoumáme architekturu Kafky, měli byste znát její základní terminologii:

  • A výrobce je proces, který může publikovat zprávu k tématu.
  • A spotřebitel je proces, který se může přihlásit k odběru jednoho nebo více témat a využívat zprávy publikované k tématům.
  • A tematická kategorie je název zdroje, do kterého jsou zprávy publikovány.
  • A makléř je proces běžící na jednom stroji.
  • A shluk je skupina makléřů spolupracujících.

Architektura Apache Kafka je velmi jednoduchá, což může v některých systémech vést k lepšímu výkonu a propustnosti. Každé téma v Kafce je jako jednoduchý soubor protokolu. Když producent publikuje zprávu, server Kafka ji připojí na konec souboru protokolu pro dané téma. Server také přiřadí offset, což je číslo používané k trvalé identifikaci každé zprávy. Jak počet zpráv roste, hodnota každého posunu se zvyšuje; například pokud producent zveřejní tři zprávy, první může získat posun 1, druhý posun 2 a třetí posun 3.

Při prvním spuštění spotřebitele Kafka odešle požadavek na vyžádání na server a požádá o načtení všech zpráv pro konkrétní téma s hodnotou posunutí vyšší než 0. Server zkontroluje soubor protokolu pro dané téma a vrátí tři nové zprávy . Spotřebitel zprávy zpracuje a poté odešle požadavek na zprávy s posunem vyšší než 3 atd.

V Kafce je klient zodpovědný za zapamatování počtu offsetů a načítání zpráv. Server Kafka nesleduje ani nespravuje spotřebu zpráv. Ve výchozím nastavení bude server Kafka uchovávat zprávu po dobu sedmi dnů. Vlákno na pozadí na serveru kontroluje a odstraňuje zprávy starší než sedm dní. Spotřebitel má přístup ke zprávám, pokud jsou na serveru. Může číst zprávu několikrát a dokonce číst zprávy v opačném pořadí od přijetí. Pokud se však zákazníkovi nepodaří načíst zprávu dříve, než vyprší sedm dní, bude mu tato zpráva chybět.

Kafkova měřítka

Produkční využití společností LinkedIn a dalších podniků ukázalo, že při správné konfiguraci je Apache Kafka schopen denně zpracovat stovky gigabajtů dat. V roce 2011 tři inženýři LinkedIn použili testovací testy k prokázání, že Kafka může dosáhnout mnohem vyšší propustnosti než ActiveMQ a RabbitMQ.

Rychlé nastavení a ukázka aplikace Apache Kafka

V tomto kurzu sestavíme vlastní aplikaci, ale pojďme začít instalací a testováním instance Kafka s out-of-the-box producentem a spotřebitelem.

  1. Navštivte stránku ke stažení Kafky a nainstalujte si nejnovější verzi (od tohoto psaní 0,9).
  2. Extrahujte binární soubory do a software / kafka složku. Pro aktuální verzi je to software / kafka_2.11-0.9.0.0.
  3. Změňte aktuální adresář tak, aby ukazoval na novou složku.
  4. Spusťte server Zookeeper spuštěním příkazu: bin / zookeeper-server-start.sh config / zookeeper.properties.
  5. Spusťte server Kafka spuštěním: bin / kafka-server-start.sh config / server.properties.
  6. Vytvořte testovací téma, které můžete použít k testování: bin / kafka-topics.sh --create --zookeeper localhost: 2181 --replication-factor 1 --partitions 1 --topic javaworld.
  7. Spusťte jednoduchého konzolového spotřebitele, který může využívat zprávy publikované k danému tématu, například javaworld: bin / kafka-console-consumer.sh --zookeeper localhost: 2181 --topický javaworld - od začátku.
  8. Spusťte jednoduchou konzolu producenta, která může publikovat zprávy na testovací téma: bin / kafka-console-producer.sh --broker-list localhost: 9092 --topic javaworld.
  9. Zkuste napsat jednu nebo dvě zprávy do konzoly producenta. Vaše zprávy by se měly zobrazovat ve spotřebitelské konzoli.

Příklad aplikace s Apache Kafkou

Už jste viděli, jak Apache Kafka funguje po vybalení z krabice. Dále pojďme vyvinout vlastní aplikaci výrobce / spotřebitele. Producent načte vstup uživatele z konzoly a odešle každý nový řádek jako zprávu na server Kafka. Spotřebitel načte zprávy pro dané téma a vytiskne je na konzoli. Výrobní a spotřebitelské komponenty jsou v tomto případě vaše vlastní implementace kafka-console-producer.sh a kafka-console-consumer.sh.

Začněme vytvořením Výrobce.java třída. Tato klientská třída obsahuje logiku pro načtení vstupu uživatele z konzoly a odeslání tohoto vstupu jako zprávy na server Kafka.

Nakonfigurujeme producenta vytvořením objektu z java.util.Vlastnosti třídy a nastavení jejích vlastností. Třída ProducerConfig definuje všechny různé dostupné vlastnosti, ale výchozí hodnoty Kafky jsou pro většinu použití dostatečné. Pro výchozí konfiguraci musíme nastavit pouze tři povinné vlastnosti:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) nastavuje seznam párů hostitel: port použitých k navázání počátečních připojení ke clusteru Kakfa v host1: port1, host2: port2, ... formát. I když máme v našem kafkovském klastru více než jednoho brokera, stačí zadat hodnotu prvního brokera hostitel: port. Klient Kafka použije tuto hodnotu k uskutečnění volání brokera, které vrátí seznam všech brokerů v klastru. Je vhodné určit více než jednoho makléře v BOOTSTRAP_SERVERS_CONFIG, takže pokud je první makléř nefunkční, klient bude moci vyzkoušet jiné makléře.

Server Kafka očekává zprávy dovnitř klíč byte [], hodnota byte [] formát. Spíše než převod každého klíče a hodnoty nám knihovna Kafka na straně klienta umožňuje používat přátelštější typy, jako je Tětiva a int pro odesílání zpráv. Knihovna je převede na příslušný typ. Například ukázková aplikace nemá klíč specifický pro zprávu, takže použijeme nula pro klíč. Pro hodnotu použijeme a Tětiva, což jsou data zadaná uživatelem na konzole.

Konfigurovat klíč zprávy, nastavili jsme hodnotu KEY_SERIALIZER_CLASS_CONFIG na org.apache.kafka.common.serialization.ByteArraySerializer. To funguje, protože nula není třeba převádět na byte[]. Pro hodnota zprávy, jsme si stanovili VALUE_SERIALIZER_CLASS_CONFIG na org.apache.kafka.common.serialization.StringSerializer, protože tato třída ví, jak převést a Tětiva do byte[].

Vlastní objekty klíč / hodnota

Podobný StringSerializer, Kafka poskytuje serializátory pro další primitiva jako např int a dlouho. Abychom mohli použít vlastní objekt pro náš klíč nebo hodnotu, museli bychom vytvořit implementující třídu org.apache.kafka.common.serialization.Serializer. Pak bychom mohli přidat logiku, do které by se měla třída serializovat byte[]. Také bychom museli použít odpovídající deserializátor v našem spotřebitelském kódu.

Výrobce Kafka

Po naplnění Vlastnosti třídy s nezbytnými vlastnostmi konfigurace, můžeme ji použít k vytvoření objektu KafkaVýrobce. Kdykoli poté chceme poslat zprávu na server Kafka, vytvoříme objekt z ProducerRecord a zavolat KafkaVýrobceje poslat() metoda s tímto záznamem k odeslání zprávy. The ProducerRecord trvá dva parametry: název tématu, do kterého má být zpráva publikována, a skutečná zpráva. Nezapomeňte zavolat Producer.close () metoda, když jste hotovi s použitím výrobce:

Výpis 1. KafkaProducer

 public class Producer {private static Scanner in; public static void main (String [] argv) vyvolá výjimku {if (argv.length! = 1) {System.err.println ("Uveďte prosím 1 parametr"); System.exit (-1); } Řetězec topicName = argv [0]; in = nový skener (System.in); System.out.println ("Zadejte zprávu (pro ukončení zadejte exit)"); // Konfigurace vlastností producenta configProperties = new Properties (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer = new KafkaProducer (configProperties); Řetězec = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = new ProducerRecord (topicName, line); producent. odeslat (rec); line = in.nextLine (); } in.close (); producer.close (); }} 

Konfigurace spotřebitele zprávy

Dále vytvoříme jednoduchého spotřebitele, který se přihlásí k odběru tématu. Kdykoli je k tématu publikována nová zpráva, přečte si ji a vytiskne na konzoli. Spotřebitelský kód je docela podobný kódu výrobce. Začneme vytvořením objektu java.util.Vlastnosti, nastavení jeho vlastností specifických pro spotřebitele a následné použití k vytvoření nového objektu Spotřebitel Kafka. Třída ConsumerConfig definuje všechny vlastnosti, které můžeme nastavit. Existují pouze čtyři povinné vlastnosti:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Stejně jako u třídy producentů použijeme BOOTSTRAP_SERVERS_CONFIG konfigurovat páry hostitel / port pro třídu spotřebitele. Tato konfigurace nám umožňuje navázat počáteční připojení ke clusteru Kakfa v host1: port1, host2: port2, ... formát.

Jak jsem již dříve uvedl, server Kafka očekává zprávy dovnitř byte[] klíč a byte[] formátů hodnot a má vlastní implementaci pro serializaci různých typů do byte[]. Stejně jako u výrobce, na straně spotřebitele budeme muset k převodu použít vlastní deserializátor byte[] zpět do příslušného typu.

V případě ukázkové aplikace víme, že výrobce používá ByteArraySerializer pro klíč a StringSerializer pro hodnotu. Na straně klienta tedy musíme použít org.apache.kafka.common.serialization.ByteArrayDeserializer pro klíč a org.apache.kafka.common.serialization.StringDeserializer pro hodnotu. Nastavení těchto tříd jako hodnot pro KEY_DESERIALIZER_CLASS_CONFIG a VALUE_DESERIALIZER_CLASS_CONFIG umožní spotřebiteli deserializovat byte[] kódované typy zaslané výrobcem.

Nakonec musíme nastavit hodnotu GROUP_ID_CONFIG. Toto by měl být název skupiny ve formátu řetězce. Více o této konfiguraci vysvětlím za minutu. Prozatím se podívejte na spotřebitele Kafka se čtyřmi povinnými vlastnostmi:

$config[zx-auto] not found$config[zx-overlay] not found