Programování

Vytvořeno pro realtime: Zasílání velkých objemů dat pomocí Apache Kafka, část 2

V první polovině tohoto úvodu JavaWorld do Apache Kafka jste pomocí Kafky vyvinuli několik aplikací pro malé a velké výrobce / spotřebitele. Z těchto cvičení byste měli znát základy systému zasílání zpráv Apache Kafka. V této druhé polovině se naučíte, jak používat oddíly k distribuci zátěže a horizontálnímu škálování vaší aplikace a zpracovávat až miliony zpráv denně. Dozvíte se také, jak Kafka využívá offsetů zpráv ke sledování a správě komplexního zpracování zpráv a jak chránit váš systém zasílání zpráv Apache Kafka před selháním, pokud by spotřebitel selhal. Ukázkovou aplikaci z části 1 vyvineme pro případy publikování a odběru i případy z bodu na bod.

Příčky v Apache Kafce

Témata v Kafce lze rozdělit na oddíly. Například při vytváření tématu s názvem Demo jej můžete nakonfigurovat tak, aby měl tři oddíly. Server by vytvořil tři soubory protokolu, jeden pro každý z ukázkových oddílů. Když producent zveřejnil zprávu k tématu, přiřadil by této zprávě ID oddílu. Server by pak připojil zprávu do souboru protokolu pouze pro tento oddíl.

Pokud jste poté spustili dva spotřebitele, server může přiřadit oddíly 1 a 2 prvnímu spotřebiteli a oddíl 3 druhému spotřebiteli. Každý spotřebitel by četl pouze ze svých přiřazených oddílů. Ukázkové téma nakonfigurované pro tři oddíly můžete vidět na obrázku 1.

Chcete-li scénář rozšířit, představte si kafkovský klastr se dvěma makléři, umístěný ve dvou strojích. Když jste rozdělili demo téma, nakonfigurovali byste ho tak, aby měl dva oddíly a dvě repliky. Pro tento typ konfigurace by server Kafka přiřadil dva oddíly dvěma makléřům ve vašem klastru. Každý zprostředkovatel by byl vůdcem jednoho z oddílů.

Když producent zveřejnil zprávu, šla by vedoucímu oddílu. Vedoucí by vzal zprávu a připojil ji k souboru protokolu na místním počítači. Druhý makléř by pasivně replikoval tento protokol odevzdání na svůj vlastní stroj. Pokud by vedoucí oddílu klesl, druhý zprostředkovatel by se stal novým vedoucím a začal by obsluhovat požadavky klientů. Stejným způsobem, když spotřebitel poslal požadavek na oddíl, tento požadavek by šel nejprve na vedoucího oddílu, který by vrátil požadované zprávy.

Výhody rozdělení

Zvažte výhody rozdělení systému zpráv založeného na Kafce:

  1. Škálovatelnost: V systému s pouze jedním oddílem jsou zprávy publikované k tématu uloženy v souboru protokolu, který existuje na jednom počítači. Počet zpráv pro téma se musí vejít do jednoho souboru protokolu potvrzení a velikost uložených zpráv nikdy nemůže být větší než místo na disku daného stroje. Rozdělení tématu umožňuje škálovat váš systém ukládáním zpráv na různých počítačích v clusteru. Pokud byste například chtěli uložit 30 gigabajtů (GB) zpráv pro ukázkové téma, můžete vytvořit cluster Kafka ze tří strojů, každý s 10 GB místa na disku. Potom byste nakonfigurovali téma tak, aby mělo tři oddíly.
  2. Vyrovnávání zatížení serveru: Mít více oddílů vám umožní rozšířit požadavky na zprávy mezi makléře. Například pokud jste měli téma, které zpracovávalo 1 milion zpráv za sekundu, můžete jej rozdělit na 100 oddílů a přidat do svého clusteru 100 makléřů. Každý zprostředkovatel by byl lídrem pro jeden oddíl, který by odpovídal na pouhých 10 000 požadavků klientů za sekundu.
  3. Vyrovnávání zátěže spotřebitelů: Podobně jako u vyrovnávání zátěže serveru, hostování více spotřebitelů na různých počítačích vám umožňuje rozložit zatížení spotřebitelů. Řekněme, že jste chtěli spotřebovat 1 milion zpráv za sekundu z tématu se 100 oddíly. Můžete vytvořit 100 spotřebitelů a provozovat je paralelně. Server Kafka by každému spotřebiteli přiřadil jeden oddíl a každý spotřebitel by paralelně zpracoval 10 000 zpráv. Jelikož Kafka přiřazuje každý oddíl pouze jednomu spotřebiteli, v rámci oddílu by byla každá zpráva spotřebována v pořadí.

Dva způsoby rozdělení

Producent je zodpovědný za rozhodnutí, do kterého oddílu se zpráva dostane. Producent má dvě možnosti ovládání tohoto úkolu:

  • Vlastní rozdělovač: Můžete vytvořit třídu implementující org.apache.kafka.clients.producer.Partitioner rozhraní. Tento zvyk Oddělovač implementuje obchodní logiku k rozhodování o tom, kam se zprávy odesílají.
  • DefaultPartitioner: Pokud nevytvoříte vlastní třídu oddílů, ve výchozím nastavení org.apache.kafka.clients.producer.internals.DefaultPartitioner bude použita třída. Výchozí oddíl je pro většinu případů dost dobrý a poskytuje tři možnosti:
    1. Manuál: Když vytvoříte a ProducerRecord, použijte přetížený konstruktor nový ProducerRecord (topicName, partitionId, messageKey, message) k zadání ID oddílu.
    2. Hašování (citlivé na lokalitu): Když vytvoříte a ProducerRecord, zadejte a messageKey, voláním nový ProducerRecord (topicName, messageKey, message). DefaultPartitioner použije hash klíče k zajištění toho, aby všechny zprávy pro stejný klíč šly stejnému producentovi. Toto je nejjednodušší a nejběžnější přístup.
    3. Stříkání (náhodné vyvažování zátěže): Pokud nechcete mít kontrolu nad zprávami oddílu, jednoduše zavolejte nový ProducerRecord (topicName, zpráva) vytvořit svůj ProducerRecord. V tomto případě bude rozdělovač odesílat zprávy na všechny oddíly způsobem „každý s každým“, což zajistí vyvážené zatížení serveru.

Rozdělení aplikace Apache Kafka

Pro jednoduchý příklad výrobce / spotřebitele v části 1 jsme použili a DefaultPartitioner. Nyní se místo toho pokusíme vytvořit vlastní oddíl. V tomto příkladu předpokládejme, že máme maloobchodní web, který mohou spotřebitelé použít k objednávání produktů kdekoli na světě. Na základě využití víme, že většina spotřebitelů je buď ve Spojených státech, nebo v Indii. Chceme rozdělit naši aplikaci tak, aby odesílala objednávky z USA nebo Indie jejich vlastním spotřebitelům, zatímco objednávky odkudkoli jinde půjdou třetím spotřebitelům.

Nejprve vytvoříme CountryPartitioner který implementuje org.apache.kafka.clients.producer.Partitioner rozhraní. Musíme implementovat následující metody:

  1. Zavolá Kafka Konfigurovat () když inicializujeme Oddělovač třída, s Mapa konfiguračních vlastností. Tato metoda inicializuje funkce specifické pro obchodní logiku aplikace, například připojení k databázi. V tomto případě chceme poměrně obecný oddíl, který bude trvat název státu jako vlastnost. Pak můžeme použít configProperties.put ("partitions.0", "USA") mapovat tok zpráv do oddílů. V budoucnu můžeme tento formát použít ke změně, které země dostanou svůj vlastní oddíl.
  2. The Výrobce Volání API rozdělit() jednou za každou zprávu. V tomto případě ji použijeme ke čtení zprávy a analýze názvu země ze zprávy. Pokud je název země v countryToPartitionMap, vrátí se partitionId uložené v Mapa. Pokud ne, provede hash hodnotu země a použije ji k výpočtu, do kterého oddílu má přejít.
  3. Voláme zavřít() vypnout rozdělovač. Použití této metody zajistí, že všechny prostředky získané během inicializace budou během vypnutí vyčištěny.

Všimněte si, že když volá Kafka Konfigurovat (), producent Kafka předá všechny vlastnosti, které jsme pro producenta nakonfigurovali Oddělovač třída. Je nezbytné, abychom četli pouze ty vlastnosti, které začínají oddíly., analyzujte je a získejte partitionIda uložte ID do countryToPartitionMap.

Níže je naše vlastní implementace Oddělovač rozhraní.

Výpis 1. CountryPartitioner

 veřejná třída CountryPartitioner implementuje Partitioner {private static Map countryToPartitionMap; public void configure (Map configs) {System.out.println ("Inside CountryPartitioner.configure" + configs); countryToPartitionMap = nový HashMap (); for (Map.Entry entry: configs.entrySet ()) {if (entry.getKey (). startsWith ("partitions.")) {String keyName = entry.getKey (); String value = (String) entry.getValue (); System.out.println (keyName.substring (11)); int paritionId = Integer.parseInt (keyName.substring (11)); countryToPartitionMap.put (hodnota, paritionId); }}} oddíl public int (téma řetězce, klíč objektu, byte [] keyBytes, hodnota objektu, byte [] valueBytes, cluster clusteru) {List partition = cluster.availablePartitionsForTopic (topic); String valueStr = (String) hodnota; Řetězec countryName = ((hodnota řetězce)) .split (":") [0]; if (countryToPartitionMap.containsKey (countryName)) {// Pokud je země namapována na konkrétní oddíl return, vrátí countryToPartitionMap.get (countryName); } else {// Pokud není na konkrétní oddíl namapována žádná země, rozdělte se mezi zbývající oddíly int noOfPartitions = cluster.topics (). size (); návratová hodnota.hashCode ()% noOfPartitions + countryToPartitionMap.size (); }} public void close () {}} 

The Výrobce třída v seznamu 2 (níže) je velmi podobná našemu jednoduchému producentovi z části 1, se dvěma změnami označenými tučně:

  1. Nastavili jsme vlastnost config s klíčem rovným hodnotě ProducerConfig.PARTITIONER_CLASS_CONFIG, který odpovídá plně kvalifikovanému názvu našeho CountryPartitioner třída. Také jsme nastavili název státu na partitionId, čímž mapujeme vlastnosti, které chceme předat CountryPartitioner.
  2. Předáme instanci třídy implementující org.apache.kafka.clients.producer.Callback rozhraní jako druhý argument pro producent. odeslat () metoda. Klient Kafka zavolá své po dokončení() metoda, jakmile je zpráva úspěšně publikována, připojení a RecordMetadata objekt. Tento objekt budeme moci použít k tomu, abychom zjistili, do kterého oddílu byla zpráva odeslána, stejně jako posun přiřazený publikované zprávě.

Výpis 2. Rozdělený producent

 public class Producer {private static Scanner in; public static void main (String [] argv) vyvolá výjimku {if (argv.length! = 1) {System.err.println ("Uveďte prosím 1 parametr"); System.exit (-1); } Řetězec topicName = argv [0]; in = nový skener (System.in); System.out.println ("Zadejte zprávu (pro ukončení zadejte exit)"); // Konfigurace vlastností producenta configProperties = new Properties (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");  configProperties.put (ProducerConfig.PARTITIONER_CLASS_CONFIG, CountryPartitioner.class.getCanonicalName ()); configProperties.put ("partition.1", "USA"); configProperties.put ("partition.2", "Indie");  org.apache.kafka.clients.producer.Producer producer = new KafkaProducer (configProperties); Řetězec = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = new ProducerRecord (topicName, null, line); producent. odeslat (rec, new Callback () {public void onCompletion (RecordMetadata metadata, Exception exception) {System.out.println ("Zpráva odeslána do tématu ->" + metadata.topic () + ", parition->" + metadata.partition () + "stored at offset->" + metadata.offset ()); ; }}); line = in.nextLine (); } in.close (); producer.close (); }} 

Přiřazení oddílů spotřebitelům

Server Kafka zaručuje, že oddíl je přiřazen pouze jednomu spotřebiteli, čímž je zaručeno pořadí spotřeby zpráv. Můžete ručně přiřadit oddíl nebo jej nechat přiřadit automaticky.

Pokud vaše obchodní logika vyžaduje větší kontrolu, budete muset ručně přiřadit oddíly. V tomto případě byste použili KafkaConsumer.assign () předat na server Kakfa seznam oblastí, o které se každý spotřebitel zajímal.

Automaticky přiřazené oddíly jsou výchozí a nejběžnější volbou. V tomto případě server Kafka přiřadí každému spotřebiteli oddíl a znovu přidělí oddíly tak, aby byly škálovatelné pro nové spotřebitele.

Řekněme, že vytváříte nové téma se třemi oddíly. Když spustíte prvního spotřebitele pro nové téma, Kafka přiřadí všechny tři oddíly stejnému spotřebiteli. Pokud pak spustíte druhého spotřebitele, Kafka znovu přiřadí všechny oddíly, přiřadí jeden oddíl prvnímu spotřebiteli a zbývající dva oddíly druhému spotřebiteli. Pokud přidáte třetího spotřebitele, Kafka znovu přiřadí oddíly, takže každému spotřebiteli bude přiřazen jeden oddíl. Nakonec, pokud spustíte čtvrtého a pátého spotřebitele, pak tři spotřebitelé budou mít přiřazený oddíl, ale ostatní nebudou dostávat žádné zprávy. Pokud jeden z počátečních tří oddílů selže, použije Kafka stejnou logiku dělení k opětovnému přiřazení oddílu tohoto spotřebitele jednomu z dalších spotřebitelů.

$config[zx-auto] not found$config[zx-overlay] not found