V první polovině tohoto úvodu JavaWorld do Apache Kafka jste pomocí Kafky vyvinuli několik aplikací pro malé a velké výrobce / spotřebitele. Z těchto cvičení byste měli znát základy systému zasílání zpráv Apache Kafka. V této druhé polovině se naučíte, jak používat oddíly k distribuci zátěže a horizontálnímu škálování vaší aplikace a zpracovávat až miliony zpráv denně. Dozvíte se také, jak Kafka využívá offsetů zpráv ke sledování a správě komplexního zpracování zpráv a jak chránit váš systém zasílání zpráv Apache Kafka před selháním, pokud by spotřebitel selhal. Ukázkovou aplikaci z části 1 vyvineme pro případy publikování a odběru i případy z bodu na bod.
Příčky v Apache Kafce
Témata v Kafce lze rozdělit na oddíly. Například při vytváření tématu s názvem Demo jej můžete nakonfigurovat tak, aby měl tři oddíly. Server by vytvořil tři soubory protokolu, jeden pro každý z ukázkových oddílů. Když producent zveřejnil zprávu k tématu, přiřadil by této zprávě ID oddílu. Server by pak připojil zprávu do souboru protokolu pouze pro tento oddíl.
Pokud jste poté spustili dva spotřebitele, server může přiřadit oddíly 1 a 2 prvnímu spotřebiteli a oddíl 3 druhému spotřebiteli. Každý spotřebitel by četl pouze ze svých přiřazených oddílů. Ukázkové téma nakonfigurované pro tři oddíly můžete vidět na obrázku 1.
Chcete-li scénář rozšířit, představte si kafkovský klastr se dvěma makléři, umístěný ve dvou strojích. Když jste rozdělili demo téma, nakonfigurovali byste ho tak, aby měl dva oddíly a dvě repliky. Pro tento typ konfigurace by server Kafka přiřadil dva oddíly dvěma makléřům ve vašem klastru. Každý zprostředkovatel by byl vůdcem jednoho z oddílů.
Když producent zveřejnil zprávu, šla by vedoucímu oddílu. Vedoucí by vzal zprávu a připojil ji k souboru protokolu na místním počítači. Druhý makléř by pasivně replikoval tento protokol odevzdání na svůj vlastní stroj. Pokud by vedoucí oddílu klesl, druhý zprostředkovatel by se stal novým vedoucím a začal by obsluhovat požadavky klientů. Stejným způsobem, když spotřebitel poslal požadavek na oddíl, tento požadavek by šel nejprve na vedoucího oddílu, který by vrátil požadované zprávy.
Výhody rozdělení
Zvažte výhody rozdělení systému zpráv založeného na Kafce:
- Škálovatelnost: V systému s pouze jedním oddílem jsou zprávy publikované k tématu uloženy v souboru protokolu, který existuje na jednom počítači. Počet zpráv pro téma se musí vejít do jednoho souboru protokolu potvrzení a velikost uložených zpráv nikdy nemůže být větší než místo na disku daného stroje. Rozdělení tématu umožňuje škálovat váš systém ukládáním zpráv na různých počítačích v clusteru. Pokud byste například chtěli uložit 30 gigabajtů (GB) zpráv pro ukázkové téma, můžete vytvořit cluster Kafka ze tří strojů, každý s 10 GB místa na disku. Potom byste nakonfigurovali téma tak, aby mělo tři oddíly.
- Vyrovnávání zatížení serveru: Mít více oddílů vám umožní rozšířit požadavky na zprávy mezi makléře. Například pokud jste měli téma, které zpracovávalo 1 milion zpráv za sekundu, můžete jej rozdělit na 100 oddílů a přidat do svého clusteru 100 makléřů. Každý zprostředkovatel by byl lídrem pro jeden oddíl, který by odpovídal na pouhých 10 000 požadavků klientů za sekundu.
- Vyrovnávání zátěže spotřebitelů: Podobně jako u vyrovnávání zátěže serveru, hostování více spotřebitelů na různých počítačích vám umožňuje rozložit zatížení spotřebitelů. Řekněme, že jste chtěli spotřebovat 1 milion zpráv za sekundu z tématu se 100 oddíly. Můžete vytvořit 100 spotřebitelů a provozovat je paralelně. Server Kafka by každému spotřebiteli přiřadil jeden oddíl a každý spotřebitel by paralelně zpracoval 10 000 zpráv. Jelikož Kafka přiřazuje každý oddíl pouze jednomu spotřebiteli, v rámci oddílu by byla každá zpráva spotřebována v pořadí.
Dva způsoby rozdělení
Producent je zodpovědný za rozhodnutí, do kterého oddílu se zpráva dostane. Producent má dvě možnosti ovládání tohoto úkolu:
- Vlastní rozdělovač: Můžete vytvořit třídu implementující
org.apache.kafka.clients.producer.Partitioner
rozhraní. Tento zvykOddělovač
implementuje obchodní logiku k rozhodování o tom, kam se zprávy odesílají. - DefaultPartitioner: Pokud nevytvoříte vlastní třídu oddílů, ve výchozím nastavení
org.apache.kafka.clients.producer.internals.DefaultPartitioner
bude použita třída. Výchozí oddíl je pro většinu případů dost dobrý a poskytuje tři možnosti:- Manuál: Když vytvoříte a
ProducerRecord
, použijte přetížený konstruktornový ProducerRecord (topicName, partitionId, messageKey, message)
k zadání ID oddílu. - Hašování (citlivé na lokalitu): Když vytvoříte a
ProducerRecord
, zadejte amessageKey
, volánímnový ProducerRecord (topicName, messageKey, message)
.DefaultPartitioner
použije hash klíče k zajištění toho, aby všechny zprávy pro stejný klíč šly stejnému producentovi. Toto je nejjednodušší a nejběžnější přístup. - Stříkání (náhodné vyvažování zátěže): Pokud nechcete mít kontrolu nad zprávami oddílu, jednoduše zavolejte
nový ProducerRecord (topicName, zpráva)
vytvořit svůjProducerRecord
. V tomto případě bude rozdělovač odesílat zprávy na všechny oddíly způsobem „každý s každým“, což zajistí vyvážené zatížení serveru.
- Manuál: Když vytvoříte a
Rozdělení aplikace Apache Kafka
Pro jednoduchý příklad výrobce / spotřebitele v části 1 jsme použili a DefaultPartitioner
. Nyní se místo toho pokusíme vytvořit vlastní oddíl. V tomto příkladu předpokládejme, že máme maloobchodní web, který mohou spotřebitelé použít k objednávání produktů kdekoli na světě. Na základě využití víme, že většina spotřebitelů je buď ve Spojených státech, nebo v Indii. Chceme rozdělit naši aplikaci tak, aby odesílala objednávky z USA nebo Indie jejich vlastním spotřebitelům, zatímco objednávky odkudkoli jinde půjdou třetím spotřebitelům.
Nejprve vytvoříme CountryPartitioner
který implementuje org.apache.kafka.clients.producer.Partitioner
rozhraní. Musíme implementovat následující metody:
- Zavolá Kafka Konfigurovat () když inicializujeme
Oddělovač
třída, sMapa
konfiguračních vlastností. Tato metoda inicializuje funkce specifické pro obchodní logiku aplikace, například připojení k databázi. V tomto případě chceme poměrně obecný oddíl, který bude trvatnázev státu
jako vlastnost. Pak můžeme použítconfigProperties.put ("partitions.0", "USA")
mapovat tok zpráv do oddílů. V budoucnu můžeme tento formát použít ke změně, které země dostanou svůj vlastní oddíl. - The
Výrobce
Volání API rozdělit() jednou za každou zprávu. V tomto případě ji použijeme ke čtení zprávy a analýze názvu země ze zprávy. Pokud je název země vcountryToPartitionMap
, vrátí separtitionId
uložené vMapa
. Pokud ne, provede hash hodnotu země a použije ji k výpočtu, do kterého oddílu má přejít. - Voláme zavřít() vypnout rozdělovač. Použití této metody zajistí, že všechny prostředky získané během inicializace budou během vypnutí vyčištěny.
Všimněte si, že když volá Kafka Konfigurovat ()
, producent Kafka předá všechny vlastnosti, které jsme pro producenta nakonfigurovali Oddělovač
třída. Je nezbytné, abychom četli pouze ty vlastnosti, které začínají oddíly.
, analyzujte je a získejte partitionId
a uložte ID do countryToPartitionMap
.
Níže je naše vlastní implementace Oddělovač
rozhraní.
Výpis 1. CountryPartitioner
veřejná třída CountryPartitioner implementuje Partitioner {private static Map countryToPartitionMap; public void configure (Map configs) {System.out.println ("Inside CountryPartitioner.configure" + configs); countryToPartitionMap = nový HashMap (); for (Map.Entry entry: configs.entrySet ()) {if (entry.getKey (). startsWith ("partitions.")) {String keyName = entry.getKey (); String value = (String) entry.getValue (); System.out.println (keyName.substring (11)); int paritionId = Integer.parseInt (keyName.substring (11)); countryToPartitionMap.put (hodnota, paritionId); }}} oddíl public int (téma řetězce, klíč objektu, byte [] keyBytes, hodnota objektu, byte [] valueBytes, cluster clusteru) {List partition = cluster.availablePartitionsForTopic (topic); String valueStr = (String) hodnota; Řetězec countryName = ((hodnota řetězce)) .split (":") [0]; if (countryToPartitionMap.containsKey (countryName)) {// Pokud je země namapována na konkrétní oddíl return, vrátí countryToPartitionMap.get (countryName); } else {// Pokud není na konkrétní oddíl namapována žádná země, rozdělte se mezi zbývající oddíly int noOfPartitions = cluster.topics (). size (); návratová hodnota.hashCode ()% noOfPartitions + countryToPartitionMap.size (); }} public void close () {}}
The Výrobce
třída v seznamu 2 (níže) je velmi podobná našemu jednoduchému producentovi z části 1, se dvěma změnami označenými tučně:
- Nastavili jsme vlastnost config s klíčem rovným hodnotě
ProducerConfig.PARTITIONER_CLASS_CONFIG
, který odpovídá plně kvalifikovanému názvu našehoCountryPartitioner
třída. Také jsme nastavilinázev státu
napartitionId
, čímž mapujeme vlastnosti, které chceme předatCountryPartitioner
. - Předáme instanci třídy implementující
org.apache.kafka.clients.producer.Callback
rozhraní jako druhý argument proproducent. odeslat ()
metoda. Klient Kafka zavolá svépo dokončení()
metoda, jakmile je zpráva úspěšně publikována, připojení aRecordMetadata
objekt. Tento objekt budeme moci použít k tomu, abychom zjistili, do kterého oddílu byla zpráva odeslána, stejně jako posun přiřazený publikované zprávě.
Výpis 2. Rozdělený producent
public class Producer {private static Scanner in; public static void main (String [] argv) vyvolá výjimku {if (argv.length! = 1) {System.err.println ("Uveďte prosím 1 parametr"); System.exit (-1); } Řetězec topicName = argv [0]; in = nový skener (System.in); System.out.println ("Zadejte zprávu (pro ukončení zadejte exit)"); // Konfigurace vlastností producenta configProperties = new Properties (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configProperties.put (ProducerConfig.PARTITIONER_CLASS_CONFIG, CountryPartitioner.class.getCanonicalName ()); configProperties.put ("partition.1", "USA"); configProperties.put ("partition.2", "Indie"); org.apache.kafka.clients.producer.Producer producer = new KafkaProducer (configProperties); Řetězec = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = new ProducerRecord (topicName, null, line); producent. odeslat (rec, new Callback () {public void onCompletion (RecordMetadata metadata, Exception exception) {System.out.println ("Zpráva odeslána do tématu ->" + metadata.topic () + ", parition->" + metadata.partition () + "stored at offset->" + metadata.offset ()); ; }}); line = in.nextLine (); } in.close (); producer.close (); }}
Přiřazení oddílů spotřebitelům
Server Kafka zaručuje, že oddíl je přiřazen pouze jednomu spotřebiteli, čímž je zaručeno pořadí spotřeby zpráv. Můžete ručně přiřadit oddíl nebo jej nechat přiřadit automaticky.
Pokud vaše obchodní logika vyžaduje větší kontrolu, budete muset ručně přiřadit oddíly. V tomto případě byste použili KafkaConsumer.assign ()
předat na server Kakfa seznam oblastí, o které se každý spotřebitel zajímal.
Automaticky přiřazené oddíly jsou výchozí a nejběžnější volbou. V tomto případě server Kafka přiřadí každému spotřebiteli oddíl a znovu přidělí oddíly tak, aby byly škálovatelné pro nové spotřebitele.
Řekněme, že vytváříte nové téma se třemi oddíly. Když spustíte prvního spotřebitele pro nové téma, Kafka přiřadí všechny tři oddíly stejnému spotřebiteli. Pokud pak spustíte druhého spotřebitele, Kafka znovu přiřadí všechny oddíly, přiřadí jeden oddíl prvnímu spotřebiteli a zbývající dva oddíly druhému spotřebiteli. Pokud přidáte třetího spotřebitele, Kafka znovu přiřadí oddíly, takže každému spotřebiteli bude přiřazen jeden oddíl. Nakonec, pokud spustíte čtvrtého a pátého spotřebitele, pak tři spotřebitelé budou mít přiřazený oddíl, ale ostatní nebudou dostávat žádné zprávy. Pokud jeden z počátečních tří oddílů selže, použije Kafka stejnou logiku dělení k opětovnému přiřazení oddílu tohoto spotřebitele jednomu z dalších spotřebitelů.