<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Rory Warin</title>
    <description>The latest articles on Forem by Rory Warin (@rorywarin).</description>
    <link>https://forem.com/rorywarin</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F946554%2Fed5028a0-9988-4b1a-b404-c045ebe3d519.jpg</url>
      <title>Forem: Rory Warin</title>
      <link>https://forem.com/rorywarin</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/rorywarin"/>
    <language>en</language>
    <item>
      <title>Discovery | Synonym Generation at Bloomreach</title>
      <dc:creator>Rory Warin</dc:creator>
      <pubDate>Fri, 09 Jun 2023 13:31:48 +0000</pubDate>
      <link>https://forem.com/bloomreach/discovery-synonym-generation-at-bloomreach-ob5</link>
      <guid>https://forem.com/bloomreach/discovery-synonym-generation-at-bloomreach-ob5</guid>
      <description>&lt;p&gt;&lt;strong&gt;&lt;em&gt;Blog written by: Apurva Gupta, Antariksh Bothale &amp;amp; Soubhik Bhattacharya from Bloomreach, 2016&lt;/em&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Abstract
&lt;/h2&gt;

&lt;p&gt;As a company that’s in the business of helping people find things and helping our customers “Get Found,” it’s important for Bloomreach to accurately understand what people mean when they search for something. User queries are written in human language, which leads to dialectal variations in vocabulary (different names for the same thing—cookies/biscuits). Then there is the problem of context. In the phrases “frozen yogurt” and “frozen meat,” the word “frozen” carries a very different meaning from the meaning it carries in the phrase, “Frozen toys,” which refers to the hit Disney movie. This problem isn’t helped by the fact that search queries are often just bunches of words, devoid of capitalization, punctuation and/or syntactical clues that can make the meaning clearer – ”nj devils tees” would need to be understood as ”New Jersey Devils T-shirts.” Over- and under-specificity of search queries is another common problem – people searching for “hp printers,” might also be interested in buying printers from other brands.&lt;/p&gt;

&lt;p&gt;From new fashion trends and terminology (jorts, anyone?), to words acquiring new connotations (such as the frozen → Frozen™ example), the eCommerce world springs several traps on us on our way to improving query understanding and improving search quality.&lt;/p&gt;

&lt;p&gt;Synonym extraction and generation is one of the ways in which we deal with the continuously evolving, contextual nature of human language. To bridge the gap between the query and actual content, we figure out different possible representations of a word and process them to generate synonyms. To mine for synonym pairs, we look at descriptions of 100 million products, process billions of lines of text downloaded from the Web and also look for possible synonyms for queries amongst other queries [30 million queries].&lt;/p&gt;

&lt;p&gt;At Bloomreach, we do this every week, learning new words and meanings from new data.&lt;/p&gt;

&lt;h2&gt;
  
  
  Why do synonyms need automated extraction/generation?
&lt;/h2&gt;

&lt;p&gt;Traditionally eCommerce search engines have relied on following approaches to attain synonyms:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Thesauri: There are many freely available thesauri on the market, which capture different synonyms of words.&lt;/li&gt;
&lt;li&gt;Stemmers: e.g. Porter stemmer, which contains rules to reduce words to their root forms — “mangoes =&amp;gt; mango.”&lt;/li&gt;
&lt;li&gt;Curated Dictionary: Manual entries that define synonyms.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In our tests we found these approaches to have a slew of problems. For example:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Context: A thesaurus will have “black =&amp;gt; dark” as synonyms. This is applicable in certain contexts such as “black night =&amp;gt; dark night”, but not in “black dress =&amp;gt; dark dress”.&lt;/li&gt;
&lt;li&gt;Usage: Thesauri do not cover general use of language. “rolling laptop case =&amp;gt; rolling computer bag”. In other words, thesauri are not designed for eCommerce or products.&lt;/li&gt;
&lt;li&gt;Evolution: “frozen princess =&amp;gt; elsa” is a relationship with which all viewers of the movie “Frozen” would be familiar, but which dictionaries would not be.&lt;/li&gt;
&lt;li&gt;Precision: In Web search, a user is presented with millions of results and generally finds relevant results among the first few. Whereas in eCommerce, people tend to go through hundreds of results to find the product they wanted. In such a scenario, tail results also need to be relevant, e.g. showing “short stemmed roses” for a search of “shorts” is a very bad user experience.&lt;/li&gt;
&lt;li&gt;Sorting: Also, eCommerce portals allow users to sort by price, popularity etc. Such sorting prominently shows users results that otherwise would have been at the end. Hence adding a bad or context-free synonym such as “black dress =&amp;gt; dark dress,” may, under certain sorts, show users dresses which are not black.&lt;/li&gt;
&lt;li&gt;Hyponyms: It is difficult to find mappings such as “computer accessories =&amp;gt; mouse, keyboard” in a dictionary. These are not synonyms but nevertheless are required in order to bridge the gap from user query to the product.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Although these problems can be resolved by using heuristics or by cleaning/augmenting the dictionaries manually, such an approach requires significant investment of manpower; and as language keeps evolving, it is a continuous investment.&lt;/p&gt;

&lt;h2&gt;
  
  
  Different types of synonyms
&lt;/h2&gt;

&lt;p&gt;Given a query Q, a phrase Q’ will be called synonym of Q, if results of Q’ are also relevant for Q.&lt;/p&gt;

&lt;p&gt;Adhering to the above definition, we can classify synonyms into various classes: &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Spelling or Lemma variants (e.g. for the query “wifi adapter”)&lt;br&gt;
a) Abbreviation: wireless USB adapter&lt;br&gt;
b) Space variation: wi-fi USB adapter&lt;br&gt;
c) Lemma Variants: wifi USB adaptor&lt;br&gt;
d) Stem: wifi USB adapters, wireless USB adapters&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Same meaning&lt;br&gt;
a) women’s =&amp;gt;  ladies’&lt;br&gt;
b) graph paper =&amp;gt; grid paper&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Hyponyms/Hypernyms. In this class, synonyms describe a superset of the query:&lt;br&gt;
a) desktop pc =&amp;gt; computers.&lt;br&gt;
b) football gear =&amp;gt; football cleats, football gloves, football helmet.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Related. These are not synonyms, but related phrases. They can be substituted when results are not available for true synonyms.&lt;br&gt;
a) paper plates =&amp;gt; plastic plates&lt;br&gt;
b) abacus =&amp;gt; calculator&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Different types of synonyms are applied with varying confidence and methodologies: e.g. adapter =&amp;gt; adapters can be applied with very high confidence and irrespective of context but “apple =&amp;gt; apples” cannot be. Similarly, “plastic plates” should be shown for a search of “paper plates,” only when paper plates are not available. Synonyms need to be graded by quality and type when they are generated in order to account for the complexity of properly applying them.&lt;/p&gt;

&lt;h2&gt;
  
  
  Generation
&lt;/h2&gt;

&lt;p&gt;The process of synonym generation is closer to “mining” than “searching:” e.g. we do not specifically generate synonyms for a query Q, but we generate all possible synonym pairs and expect pairs with Q or its subparts to have been generated. Just like mining, we can increase the probability of finding useful pairs by looking into the right data and using the right techniques. And just like mining, it will be all in the background, hidden from search users.&lt;/p&gt;

&lt;h2&gt;
  
  
  Representation
&lt;/h2&gt;

&lt;p&gt;In a few words, the process of synonym generation can be classified into one which filters candidate synonym pairs, where each pair is a tuple of two phrases . Let us take the example of this pair . To us (humans) it is obvious that these two phrases are talking about the same objects. But to a machine, which has no context of what a presentation is or what it means to click, these two phrases are just sequences of letters, as randomly paired as any two other phrases. A simple English phrase, such as “wireless presenter,” contains a lot of contextual information, which goes amiss when we use just these two words to describe something. In representation phase we get a more complete description about this phrase using different sources of information. A few such possible representations and the information they augment are:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Neighboring Context (Bag or embeddings): You shall know a word by the company it keeps (Firth, J. R.)We collect phrases surrounding a phrase (e.g. frozen toys) from all over the Web. Then we count and sort these phrases by their frequency. Thus we obtain a list of frequently occurring phrases around “frozen toys.” These phrases describe the contexts in which “frozen toys” occur. Intuition behind such representation is that words/phrases with similar meaning would occur in similar contexts. Thus by comparing contexts of two phrases, we can compute a similarity score – which gives a quantitative association between two words, or phrases:&lt;/li&gt;
&lt;li&gt;P(P1,P2) = probability of P1 and P2 occurring adjacently in same query&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Rep(P1) = P(P1,Pi) for all Pi where {Pi} is the set of all phrases in corpus and Rep(P1) is the final representation of P1 in this method.&lt;br&gt;
Recently, word embeddings such as word vectors generated by word2vec have been discovered to represent such contextual information in many fewer dimensions.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Product Clicks (Behavioral): Each phrase is represented as a probability distribution of products/documents that have been viewed/clicked/bought etc., after a user used this phrase as a query. Intuition behind such a  representation is that for two search queries, q1 and q2, if users have the same/similar intent, they will click on similar documents and will have similar choices of documents. For example, if a product document is a relevant swimsuit, it will also be relevant  swimwear. So, intuitively it should get user clicks for both the search queries and it will also be clicked more frequently than a less relevant swimwear/swimsuit.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;P(d1,q) = probability of document d1 being clicked after q was queried and before any other query is done&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;REP(q) = {P(di,q)} for all documents di&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Document vector: A phrase is represented by the “average of documents“ that contain this phrase. To achieve this, we first represent each document containing the phrase as a tf-idf vector of terms/phrases in that document. Then we take the average of all these vectors to achieve a vector representation for the “average of documents.”&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Rep(D1) = for all phrases Pi in document D1 {this is representation of document D1}&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Rep(P1) = average of Rep(Di) for all Di where P1 occurs {average of all documents containing P1 , represents P1}&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Nearby queries: Whenever a user queries for “X”  and then issues another query, “Y”, we add “Y” to representation of “X”. Thus we obtain a frequency count of phrases (such as “Y”), which can be used to represent “X”.P(P1,P2) = probability of P1 and P2 occurring in same sessionRep(P1) = P(P1,Pi) for all Pi where {Pi} is the set of all phrases in corpus and Rep(P1) is the final representation of P1 in this method.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Such representations map the humble two-word pairs to a pair of vectors, which have hundreds of dimensions. This new pair contains far more information and is more obvious for an algorithm as a pair.&lt;/p&gt;

&lt;h2&gt;
  
  
  Similarity
&lt;/h2&gt;

&lt;p&gt;We borrow different distance and divergence metrics from various spaces to judge the quality of a candidate pair.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Different similarity measures:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;Asymmetric: Asymmetric measures are used to figure out hyponym/hypernym pairs. For example a low value of |A∩B| / |A| and high value of |A∩B| / |B|, indicates that B is close to being a subset of A. If A and B were products viewed for queries A’ and B’, this measure can be used to determine if A is hypernym of B or vice versa.&lt;/li&gt;
&lt;li&gt;Symmetric: Symmetric measures determine if A and B are approximately equivalent or contain similar information. To compare the distributional representation of clicks (2) of two phrases, we use KL – divergence, which is a measure of the information lost when distribution Q is used to approximate distribution P.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;KL – Divergence of two distributions for phrase P and phrase Q.&lt;/p&gt;

&lt;p&gt;Where summation is over the set of documents and P(i) is the probability of a document being clicked for phrase P. But this is a directional measure and is valid only when Q = 0 implies P = 0. Hence we use a symmetrized version which is JS divergence.&lt;/p&gt;

&lt;p&gt;A good JSD score implies that P and Q are both good approximations of each other. Although, if P and Q are over a very small event space, confidence on this score will not be enough to use P and Q as synonym. Having another measure of confidence has proven to help in such cases.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Augment with extra similarities: Besides computing the similarities on the basis of the above representations, we can augment certain similarities such as edit distance.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;After computing all similarities and augmentation, we have various scores which tell us the grade and type of synonym pairs. We observed that simple heuristics such as thresholding over the vector of scores works well. But these scores are continuously distributed and choosing a threshold manually caused a lot of good synonyms to get removed. This led us to the finding that thresholds would have to be phrase specific. For example, the distance between “black” and “dark” is less than the distance between “timer =&amp;gt; stopwatch,” whereas, the second one happens to be a better synonym pair. We found that using scores of some obvious synonyms for a query helped in determining the threshold: e.g. Any phrase which is closer to “shoe” than “shoes” would be a good synonym for shoe. Thus, D(P, Q) &amp;lt; D(P,P’) where P’ is an obvious variation of P implies that P, Q are good candidates for synonymy.&lt;/p&gt;

&lt;p&gt;Since we had already represented phrases and similarity of pairs as vectors , we can also feed these vectors to a supervised classifier for extracting synonyms.&lt;/p&gt;

&lt;h2&gt;
  
  
  Diagram
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--f54sM2p6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/rxtt6o90cgw2vybdmhuf.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--f54sM2p6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/rxtt6o90cgw2vybdmhuf.png" alt="Image description" width="800" height="160"&gt;&lt;/a&gt;&lt;br&gt;
Illustration of synonym generation system&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;In this blo,g we have described how we generate different types of synonyms at BloomReach. It helps us in bridging the gap between our customers and the end users who search on eCommerce websites. We have tried our best to keep the methods independent of language, by not relying on grammar, etc. This will help us in serving people searching for products in different languages. Just like spoken language, our algorithms have their share of problems. They also make mistakes and have their good and bad days. We continuously work at improving them and welcome any suggestions or feedback.&lt;/p&gt;

&lt;h2&gt;
  
  
  References
&lt;/h2&gt;

&lt;p&gt;&lt;a href="http://web.stanford.edu/class/cs124/lec/sem"&gt;http://web.stanford.edu/class/cs124/lec/sem&lt;/a&gt;&lt;br&gt;
&lt;a href="http://www.aclweb.org/anthology/C10-2151"&gt;http://www.aclweb.org/anthology/C10-2151&lt;/a&gt;&lt;br&gt;
&lt;a href="http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36252.pdf"&gt;http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36252.pdf&lt;/a&gt;&lt;br&gt;
&lt;a href="http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.58.4300"&gt;http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.58.4300&lt;/a&gt;&lt;br&gt;
&lt;a href="http://research.microsoft.com/pubs/167835/idg811-cheng.pdf"&gt;http://research.microsoft.com/pubs/167835/idg811-cheng.pdf&lt;/a&gt;&lt;/p&gt;

</description>
      <category>bloomreach</category>
      <category>discovery</category>
      <category>saas</category>
    </item>
    <item>
      <title>Discovery | SolrCloud Rebalance API</title>
      <dc:creator>Rory Warin</dc:creator>
      <pubDate>Fri, 09 Jun 2023 13:22:25 +0000</pubDate>
      <link>https://forem.com/bloomreach/discovery-solrcloud-rebalance-api-4bh8</link>
      <guid>https://forem.com/bloomreach/discovery-solrcloud-rebalance-api-4bh8</guid>
      <description>&lt;p&gt;&lt;strong&gt;&lt;em&gt;Blog written by: Nitin Sharma &amp;amp; Suruchi Shah from Bloomreach, 2015&lt;/em&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;In a multi-tenant search architecture, as the size of data grows, the manual management of collections, ranking/search configurations becomes non-trivial and cumbersome. This blog describes an innovative approach we implemented at Bloomreach that helps with an effective index and a dynamic config management system for massive multi-tenant search infrastructure in SolrCloud.&lt;/p&gt;

&lt;h2&gt;
  
  
  Problem
&lt;/h2&gt;

&lt;p&gt;The inability to have granular control over index and config management for Solr collections introduces complexities in geographically spanned, massive multi-tenant architectures. Some common scenarios, involving adding and removing nodes, growing collections and their configs, make cluster management a significant challenge. Currently, Solr doesn’t offer a scaling framework to enable any of these operations. Although there are some basic Solr APIs to do trivial core manipulation, they don’t satisfy the scaling requirements at Bloomreach.&lt;/p&gt;

&lt;h2&gt;
  
  
  Innovative Data Management in SolrCloud Architecture
&lt;/h2&gt;

&lt;p&gt;To address the scaling and index management issues, we have designed and implemented the Rebalance API, as shown in Figure 1. This API allows robust index and config manipulation in SolrCloud, while guaranteeing zero downtime using various scaling and allocation strategies. It has  two dimensions:&lt;/p&gt;

&lt;p&gt;The seven scaling strategies are as follows:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Auto Shard allows re-sharding an entire collection to any number of destination shards. The process includes re-distributing the index and configs consistently across the new shards, while avoiding any heavy re-indexing processes.  It also offers the following flavors:&lt;/li&gt;
&lt;li&gt;Flip Alias Flag controls whether or not the alias name of a collection (if it already had an alias) should automatically switch to the new collection.&lt;/li&gt;
&lt;li&gt;Size-based sharding allows the user to specify the desired size of the destination shards for the collection. As a result, the system defines the final number of shards depending on the total index size.&lt;/li&gt;
&lt;li&gt;Redistribute enables distribution of cores/replicas across unused nodes. Oftentimes, the cores are concentrated within a few nodes. Redistribute allows load sharing by balancing the replicas across all nodes.&lt;/li&gt;
&lt;li&gt;Replace allows migrating all the cores from a source node to a destination node. It is useful in cases requiring replacement of an entire node.&lt;/li&gt;
&lt;li&gt;Scale Up adds new replicas for a shard. The default allocation strategy for scaling up is unused nodes. Scale up also has the ability to replicate additional custom per-merchant configs in addition to the index replication (as an extension to the existing replication handler, which only syncs the index files)&lt;/li&gt;
&lt;li&gt;Scale Down removes the given number of replicas from a shard.&lt;/li&gt;
&lt;li&gt;Remove Dead Nodes is an extension of Scale Down, which allows removal of the replicas/shards from dead nodes for a given collection. In the process, the logic unregisters the replicas from Zookeeper. This in-turn saves a lot of back-and-forth communication between Solr and Zookeeper in their constant attempt to find the replicas on dead nodes.&lt;/li&gt;
&lt;li&gt;Discovery-based Redistribution allows distribution of all collections as new nodes are introduced into a cluster. Currently, when a node is added to a cluster, no operations take place by default. With redistribution, we introduce the ability to rearrange the existing collections across all the nodes evenly.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Figure 1: Rebalance API overview&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--eR4dovIY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/4hvohwrpedwxi4so6k2s.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--eR4dovIY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/4hvohwrpedwxi4so6k2s.png" alt="Image description" width="800" height="556"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Scenarios
&lt;/h2&gt;

&lt;p&gt;Let’s take a quick view at some resolved uses cases using the new Rebalance API.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Scenario 1: Re-sharding to meet latency SLAs&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Collections often grow dynamically, resulting in an increased number of documents to retrieve (up to ~160M documents) and slowing down the process. In order to meet our latency SLAs, we decide to re-shard the collection. The process of increasing shards, for instance from nine to 12, for a given collection, is challenging since there is no accessible method to divide the index data evenly while controlling the placement of new shards on desired nodes.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;API call&lt;/li&gt;
&lt;li&gt;End Result: As observed in the diagram below, adding a shard doesn’t add any documents by default. Additionally, the node on which the new shard resides is based on the machine triggering the action. With the Rebalance API, we automatically distribute the documents by segmenting into even parts across the new shards.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Figure 2: Auto-Sharding to an increased number of shards.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--YkmfBhnk--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/1pjeqfnamgdggphrgqx2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--YkmfBhnk--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/1pjeqfnamgdggphrgqx2.png" alt="Image description" width="694" height="496"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Scenario 2:  Data Migration from node to node.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;We have two nodes, one of them is out or dead and we want to migrate the replicas/cores to a different live node. OR we encounter an uneven number of replicas on a set number of nodes, leading to a skewed distribution of data load, and we need to redistribute it across nodes.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;API Call: &lt;/li&gt;
&lt;li&gt;End Result: As observed in the diagram below, the BEFORE section demonstrates the uneven distribution of replicas/cores across the three nodes. Upon calling the REDISTRIBUTE strategy, we divide the replicas/cores across all nodes evenly.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Figure 3: Redistributing replicas/cores across the nodes. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--SSY6IwED--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/e0fddvq7lifv203deayo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--SSY6IwED--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/e0fddvq7lifv203deayo.png" alt="Image description" width="700" height="525"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Scenario 3:  Dynamic Horizontal Scaling&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Dynamic Horizontal Scaling is very useful when, for instance, we have two cores for a collection and want to temporarily scale up or scale down based on traffic needs and allocation strategy.&lt;/p&gt;

&lt;p&gt;API Call: &lt;a href="http://host:port/solr/admin/collections?action=REBALANCE&amp;amp;scaling_strategy=SCALE_UP"&gt;http://host:port/solr/admin/collections?action=REBALANCE&amp;amp;scaling_strategy=SCALE_UP&lt;/a&gt; &amp;amp;num_replicas=2&amp;amp;collection=collection_name&lt;/p&gt;

&lt;p&gt;End Result: We observe in the diagram below that when new replicas are added, they have to be added one at the time, without control over node allocation. Furthermore, only the index files get replicated. According to the new Rebalance API, all the custom configs are replicated in addition to the index files on the new replicas where the nodes are chosen based on the allocation strategy.&lt;/p&gt;

&lt;p&gt;Figure 4: Scaling up both shards by adding 2 replicas each&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--Zx5sZE-W--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/wc023g2fnuptb7lyhlsy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Zx5sZE-W--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/wc023g2fnuptb7lyhlsy.png" alt="Image description" width="753" height="477"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The chart below compares the states gathered from running tests to calculate the average time to split indexes using various approaches. It is important to note that while the second method only accounts for index distribution, the REBALANCE API (third and fourth) methods also include replication of custom configs.&lt;/p&gt;

&lt;p&gt;As we notice in the table below, the Bloomreach Rebalance API performs much better, compared to the first two methods in terms of time. Furthermore, we parallelized the split and sync operation by making the Rebalance API more efficient as demonstrated in the fourth method (for collections over 150M it gives 95% savings in auto-sharding compared to re-indexing).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--z7TsoAD0--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/50jnfq8eghd6bb9qf1r4.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--z7TsoAD0--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/50jnfq8eghd6bb9qf1r4.jpeg" alt="Image description" width="604" height="225"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;In a nutshell, Bloomreach’s new Rebalance API helps scaling SolrCloud architecture by ensuring high availability, zero downtime, seamless shard management and by providing a lot more control over index and config manipulation. Additionally, this faster and more robust mechanism has paved the way to automated recovery by allowing dynamic resizing of collections.&lt;/p&gt;

&lt;p&gt;And that’s not all! We have implemented the Rebalance API in a generic way so that it can be open sourced. So stay tuned for more details!&lt;/p&gt;

</description>
      <category>bloomreach</category>
      <category>discovery</category>
      <category>saas</category>
    </item>
    <item>
      <title>Discovery | Solr Compute Cloud - An Elastic Solr Infrastructure</title>
      <dc:creator>Rory Warin</dc:creator>
      <pubDate>Fri, 09 Jun 2023 13:14:53 +0000</pubDate>
      <link>https://forem.com/bloomreach/discovery-solr-compute-cloud-an-elastic-solr-infrastructure-57k9</link>
      <guid>https://forem.com/bloomreach/discovery-solr-compute-cloud-an-elastic-solr-infrastructure-57k9</guid>
      <description>&lt;p&gt;&lt;strong&gt;&lt;em&gt;Blog written by: Nitin Sharma &amp;amp; Li Ding from Bloomreach, 2015&lt;/em&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Scaling a multi-tenant search platform that has high availability while maintaining low latency is a hard problem to solve.  It’s especially hard when the platform is running a heterogeneous workload on hundreds of millions of documents and hundreds of collections in SolrCloud.&lt;/p&gt;

&lt;p&gt;Typically search platforms have a shared cluster setup. It does not scale out of the box for heterogenous use cases. A few of the shortcomings are listed below.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Uneven workload distribution causing noisy neighbor problem. One search pipeline affects another pipeline’s performance. (especially if they are running against the same Solr collection).&lt;/li&gt;
&lt;li&gt;Impossible to tune Solr cache for the same collection for different query patterns.&lt;/li&gt;
&lt;li&gt;Commit Frequency varies across indexing jobs causing unpredictable write load in the SolrCloud cluster.&lt;/li&gt;
&lt;li&gt;Bad clients leaking connections that could potentially bring down the cluster.&lt;/li&gt;
&lt;li&gt;Provisioning for the peak causes un-optimal resource utilization.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The key to solve these problems is isolation.  We isolate the write and read of each the collections. At Bloomreach, we have implemented an Elastic Solr Infrastructure that dynamically grows/shrinks. It helps provide the right amount of isolation among pipelines while improving resource utilization.The SC2 API and HAFT services ( built in house) give us the ability to do the isolation and scale the platform in an elastic manner while guaranteeing high availability, low latency and low operational cost.&lt;/p&gt;

&lt;p&gt;This blog describes our innovative solution in greater detail and how we scaled our infrastructure to be truly elastic and cost optimized.  We plan to open source HAFT Service in the future for anyone who is interested in building their own highly available Solr search platform.&lt;/p&gt;

&lt;h2&gt;
  
  
  Problem Statement
&lt;/h2&gt;

&lt;p&gt;Below is a diagram that describes our workload.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--M1qHA6Mv--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/6q3ghj3fxswkg4odt2l4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--M1qHA6Mv--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/6q3ghj3fxswkg4odt2l4.png" alt="Image description" width="770" height="363"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this scheme, our production Solr cluster is the center for everything. As for the other elements:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;At the bottom, the public API is serving an average of 100 QPS (query per second).&lt;/li&gt;
&lt;li&gt;The blue boxes labeled indexing will commit index updates (full and partial indexing) to the system.  Every indexing job represents a different customer’s data, which commit at different frequencies from everyday to every hour to every half hour.&lt;/li&gt;
&lt;li&gt;The red boxes labeled pipeline are jobs to run ranking and relevance queries.  The pipelines represent various types of read/analytical workload issued against customer data.  Two or more pipelines can run against the same collections at the same time, which increases the complexity.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;With this kind of workload, we are facing several key challenges with each client we serve.  The graph below illustrates some the challenges with this architecture:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--2RgoCMFL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/r33s0g30gp0qg7fu32sv.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--2RgoCMFL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/r33s0g30gp0qg7fu32sv.png" alt="Image description" width="800" height="598"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;For indexing jobs:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Commits at same time as heavy reads: Indexing jobs running at the same time as pipelines and customer queries, which impact both pipelines and the latency of customer queries.&lt;/li&gt;
&lt;li&gt;Frequent commits: Frequent commits and non-batched updates cause performance issues in Solr.&lt;/li&gt;
&lt;li&gt;Leaked indexing: Indexing jobs might fail resulting in leaked clients, which get accumulated over time.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;For the pipeline jobs:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Cache tuning: Impossible for Solr to tune the cache. The query pattern varies between pipeline jobs when working on the same collections.&lt;/li&gt;
&lt;li&gt;OOM and high CPU usage: Unevenly distributed workload among Solr hosts in the clusters.  Some nodes might have OOM error while other nodes have high CPU usage.&lt;/li&gt;
&lt;li&gt;Bad pipeline: One bad client or query could bring down the performance of the entire cluster or make one node unresponsive.&lt;/li&gt;
&lt;li&gt;Heavy load pipeline: One heavy load pipeline would affect other smaller pipelines.&lt;/li&gt;
&lt;li&gt;Concurrent pipelines: The more concurrent pipeline jobs we ran, the more failures we saw.&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Bloomreach Search Architecture
&lt;/h2&gt;

&lt;p&gt;Left unchecked, these problems would eventually affect the availability and latency SLA with our customers.  The key to solving these problems is isolation. Imagine if every pipeline and indexing job had its own Solr cluster, containing the collections they need, and every cluster was optimized for that job in terms of system, application and cache requirements.  The production Solr cluster wouldn’t have any impact from those workloads.  At Bloomreach, we designed and implemented a system we call Solr Compute Cloud (SC2) to isolate all the workload to scale the Solr search platform.&lt;/p&gt;

&lt;p&gt;The architecture overview of SC2 is shown in the diagram below:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--VjpJ6I1r--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/98jmk7eygh9jwvq2rcwe.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--VjpJ6I1r--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/98jmk7eygh9jwvq2rcwe.png" alt="Image description" width="800" height="452"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We have an elastic layer of clusters which is the primary source of data for large indexing and analysis MapReduce pipelines.  This prevents direct access to production clusters from any pipelines.  Only search queries from customers are allowed to access production clusters.  The technologies behind elastic layer are SC2 API and Solr HAFT (High Availability and Fault Tolerance) Service (both built in-house).&lt;/p&gt;

&lt;p&gt;SC2 API features include:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Provisioning and dynamic resource allocation: Fulfill client requests by creating SC2 clusters using cost-optimized instances that match resources necessary for requested collections.&lt;/li&gt;
&lt;li&gt;Garbage collection:  Will automatically terminate the SC2 clusters that exceed an allowed lifetime setting or idle for a certain amount of time.&lt;/li&gt;
&lt;li&gt;Pipeline and indexing job performance monitoring:  Monitor the cost and performance of each running job.&lt;/li&gt;
&lt;li&gt;Reusability:  Create an instance pool based on the request type to provision clusters faster. The API will also find an existing cluster based on the request data for read pipelines instead of provisioning a new cluster.  Those clusters are at low utilization.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Solr HAFT Service provides several key features to support our SC2 infrastructure.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Replace node: When one node is down in a Solr cluster, this feature automatically adds a new node to replace that node.&lt;/li&gt;
&lt;li&gt;Add replicas: Add extra replicas to existing collections if the query performance is getting worse.&lt;/li&gt;
&lt;li&gt;Repair collection: When a collection is down, this feature repairs the collection by deleting the existing collection. Then it re-creates and streams data from backup Solr clusters.&lt;/li&gt;
&lt;li&gt;Collection versioning: Config of each collection can be versioned and rolled back to previous known healthy config if a bad config was uploaded to Solr.&lt;/li&gt;
&lt;li&gt;Dynamic replica creation: Creates and streams data to a new collection based on the replica requirement of the new collection.&lt;/li&gt;
&lt;li&gt;Cluster clone: Automatically creates a new cluster based on existing serving cluster setup and streaming data from backup cluster.&lt;/li&gt;
&lt;li&gt;Cluster swap: Automatically switches Solr clusters so that the bad Solr cluster can be moved out of serving traffic and the good or newly cloned cluster can be moved in to serve traffic.&lt;/li&gt;
&lt;li&gt;Cluster state reconstruction: Reconstructs the state of newly cloned Solr cluster from existing serving cluster.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Read Workflow&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--gkH9ddI7--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/izyserehb33kpk3l4mel.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--gkH9ddI7--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/izyserehb33kpk3l4mel.png" alt="Image description" width="594" height="398"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We will describe the detailed steps of how read pipeline jobs work in SC2:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Read pipeline requests collection and desired replicas from SC2 API.&lt;/li&gt;
&lt;li&gt;SC2 API provisions SC2 cluster dynamically with needed setup (and streams Solr data).&lt;/li&gt;
&lt;li&gt;SC2 calls HAFT service to request data replication.&lt;/li&gt;
&lt;li&gt;HAFT service replicate data from production to provisioned cluster.&lt;/li&gt;
&lt;li&gt;Pipeline uses this cluster to run job.&lt;/li&gt;
&lt;li&gt;After pipeline job finishes, call SC2 API to terminate the cluster.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Indexing Workflow&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--TlWbWuK1--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/52rjsnv10n08bf0ol7e0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--TlWbWuK1--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/52rjsnv10n08bf0ol7e0.png" alt="Image description" width="601" height="412"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Below are detailed steps describing how an indexing job works in SC2.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The indexing job uses SC2 API to create a SC2 cluster of collection A with two replicas.&lt;/li&gt;
&lt;li&gt;SC2 API provisions SC2 cluster dynamically with needed setup (and streams Solr data).&lt;/li&gt;
&lt;li&gt;Indexer uses this cluster to index the data.&lt;/li&gt;
&lt;li&gt;Indexer calls HAFT service to replicate the index from SC2 cluster to production.&lt;/li&gt;
&lt;li&gt;HAFT service reads data from dynamic cluster and replicates to production Solr.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;After the job finishes, it will call SC2 API to terminate the SC2 cluster.&lt;/p&gt;

&lt;p&gt;Solr/Lucene Revolution Talk  2014 at Washington, D.C.&lt;/p&gt;

&lt;p&gt;We spoke in detail about the Elastic Infrastructure at Bloomreach in last year’s Solr Conference. The link to the video of the talk and the slides are below.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Talk: &lt;a href="https://www.youtube.com/watch?v=1sxBiXsW6BQ"&gt;https://www.youtube.com/watch?v=1sxBiXsW6BQ&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Slides: &lt;a href="http://www.slideshare.net/nitinssn/solr-compute-cloud-an-elastic-solrcloud-infrastructure"&gt;http://www.slideshare.net/nitinssn/solr-compute-cloud-an-elastic-solrcloud-infrastructure&lt;/a&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Scaling a search platform with heterogeneous workload for hundreds of millions of documents and a massive number of collections in SolrCloud is nontrivial. A kitchen-sink shared cluster approach does not scale well and has a lot of shortcomings such as  uneven workload distribution, sub-optimal cache tuning, unpredictable commit frequency  and misbehaving clients leaking connections.&lt;/p&gt;

&lt;p&gt;The key to solve these problems is &lt;strong&gt;isolation&lt;/strong&gt;. Not only do we isolate the read and write jobs as a whole but also isolate write and read of each the collection.  The in-house built SC2 API and HAFT services give us the ability to do the isolation and scale the platform in an elastic manner.&lt;/p&gt;

&lt;p&gt;The SC2 infrastructure gives us high availability and low latency with low cost by isolating heterogeneous workloads from production clusters.  We plan to open source HAFT Service in the future for anyone who is interested in building their own highly available Solr search platform.&lt;/p&gt;

</description>
      <category>bloomreach</category>
      <category>discovery</category>
      <category>saas</category>
    </item>
    <item>
      <title>Discovery | Introducing Briefly: A Python DSL to Scale Complex Mapreduce Pipelines</title>
      <dc:creator>Rory Warin</dc:creator>
      <pubDate>Fri, 09 Jun 2023 13:07:35 +0000</pubDate>
      <link>https://forem.com/bloomreach/discovery-introducing-briefly-a-python-dsl-to-scale-complex-mapreduce-pipelines-35m5</link>
      <guid>https://forem.com/bloomreach/discovery-introducing-briefly-a-python-dsl-to-scale-complex-mapreduce-pipelines-35m5</guid>
      <description>&lt;p&gt;&lt;strong&gt;&lt;em&gt;Blog written by: Chou-han Yang from Bloomreach, 2015&lt;/em&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Briefly
&lt;/h2&gt;

&lt;p&gt;Today we are excited to announce Briefly, a new open-source project designed to tackle the challenge of simultaneously handling the flow of Hadoop and non-Hadoop tasks. In short, Briefly is a Python-based, meta-programming job-flow control engine for big data processing pipelines. We called it Briefly because it provides us with a way to describe complex data processing flows in a very concise way.&lt;/p&gt;

&lt;p&gt;At Bloomreach, we have hundreds of Hadoop clusters running with different applications at any given time. From parsing HTML pages and creating indexes to aggregating page visits, we all rely on Hadoop for our day to day work. The job sounds simple, but the challenge is to handle complex operational issues without compromising code quality, as well as the ability to control a group of Hadoop clusters to maximize efficiency.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Challenges:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Data skew, or the high variance of data volume from different customers.&lt;/li&gt;
&lt;li&gt;Non-Hadoop tasks are mixed in with the Hadoop tasks and need to be completed before or after the Hadoop tasks.&lt;/li&gt;
&lt;li&gt;The fact that we run Elastic Map Reduce (EMR) with spot instances, which means clusters might be killed because of spot instance price spikes.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;There are several different approaches to solve similar problems with pipeline abstraction, including cascading and Luigi, but they all solve our problems partially. They all provide some features, but none of them help in the case of multiple Hadoop clusters. That’s why we turned to Briefly to solve our large-scale pipeline processing problems.&lt;/p&gt;

&lt;p&gt;The main idea behind Briefly, is to wrap all types of jobs into a concise Python function, so we only need to focus on the job flow logic instead of operational issues (such as fail/retry). For example, a typical Hadoop job in Briefly is wrapped like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@simple_hadoop_process

def preprocess(self):

  self.config.defaults(

    main_class = 'com.bloomreach.html.preprocess',

    args = ['${input}', '${output}']

  )



@simple_hadoop_process

def parse(self, params):

  self.config.defaults(

    main_class = 'com.bloomreach.html.parser',

    args = ['${input}', '${output}', params]

  )

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And similarly, a Java process and a python process look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@simple_java_process

def gen_index(self):

  self.config.defaults(

    classpath = ['.', 'some.other.classpath']

    main_class = 'com.bloomreach.html.genIndex',

    args = ['${input}', '${output}']

  )



@simple_process

def gen_stats(self):

  for line in self.read():

    # Do something here to analyze each line

  self.write('stats output')


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And here’s what it looks like when we chain the jobs together to create dependencies:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;objs = Pipeline("My first pipeline")

prop = objs.prop



parsed_html = source(raw_html) | preprocess() | parse(params)

index = parsed_html | gen_index()

stats = parsed_html | gen_stats()



targets = [stats, index]

objs.run(targets)


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This script creates the workflow like this:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--XTmkbzEi--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/vcfzluqsopyadzzgfele.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--XTmkbzEi--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/vcfzluqsopyadzzgfele.png" alt="Image description" width="800" height="286"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The pipeline can be executed locally (with Hadoop local mode), on Amazon EMR, or on Qubole simply by supplying different configurations. For example, running on Amazon EMR would require the following configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# emr.conf



# Where to run your pipeline. It can be local, emr, or qubole

hadoop.runner = "emr"



# Max number of concurrent EMR clusters to be created

emr.max_cluster = 10



# Instance groups for each cluster

emr.instance_groups = [[1, "MASTER", "m2.2xlarge"], [9, "CORE", "m2.2xlarge"]]



# Name of your EMR cluster

emr.cluster_name = "my-emr-cluster"



# A unique name for the project for cost tracking purpose

emr.project_name = "my-emr-project"



# Where EMR is going to put yoru log

emr.log_uri = "s3://log-bucket/log-path/"



# EC2 key pairs if you want to login into your EMR cluster

emr.keyname = "ec2-keypair"



# Spot instance price upgrade strategy. The multipliers to the EC2 on-demand price you want

# to bid against the spot instances. 0 means use on-demand instances.

emr.price_upgrade_rate = [0.8, 1.5, 0]

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Extra keys also need to be provided for specific platforms, such as Amazon EMR:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# your_pipeline.conf



ec2.key = "your_ec2_key"

ec2.secret = "your_ec2_secret"


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And then your are good go. Run your Briefly pipeline with all your configuration files:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;python your_pipeline.py -p your_pipeline.conf -p emr.conf -Dbuild_dir=build_dir_path

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now you can have several different configurations for running job locally, on Qubole, or with different cluster sizes. One thing we find useful is to subdivide a big cluster into smaller clusters which increases the survivability of the entire group of clusters, especially when running on spot instances.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--OUiEA3Wj--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/uczrp24trb1nao5611m9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--OUiEA3Wj--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/uczrp24trb1nao5611m9.png" alt="Image description" width="800" height="429"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The number of clusters and the cluster size can be adjusted according to the jobs being executed. Many small clusters provides better throughput when running with a lot of small jobs. On the other hand, a large job may run longer on a few clusters while other clusters may be terminated after a predetermined idle time. The setting can be changed easily in the configuration for performance tests.&lt;/p&gt;

&lt;h2&gt;
  
  
  Other features Briefly
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Use of a Hartman pipeline to create job flow.&lt;/li&gt;
&lt;li&gt;Resource management for multiple Hadoop clusters (Amazon EMR, Qubole) for parallel execution, also allowing customized Hadoop clusters.&lt;/li&gt;
&lt;li&gt;Individual logs for each process to make debugging easier.&lt;/li&gt;
&lt;li&gt;Fully resumable pipeline with customizable execution check and error handling.&lt;/li&gt;
&lt;li&gt;Encapsulated local and remote filesystem (s3) for unified access.&lt;/li&gt;
&lt;li&gt;Automatic download of files from s3 for local processes and upload of files to s3 for remote processes with s4cmd.&lt;/li&gt;
&lt;li&gt;Automatic fail/retry logic for all failed processes.&lt;/li&gt;
&lt;li&gt;Automatic price upgrades for EMR clusters with spot instances.&lt;/li&gt;
&lt;li&gt;Timeout for Hadoop jobs to prevent long-running clusters.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;We use Briefly to build and operate complex data processing pipelines across multiple mapreduce clusters. Briefly provides us with an ability to simplify the pipeline building and separate the operational logic from business logic, which makes each component reusable.&lt;/p&gt;

&lt;p&gt;Please leave us feedback, file issues and submit pull requests if you find this useful. The code is available on GitHub at &lt;a href="https://github.com/bloomreach/briefly"&gt;https://github.com/bloomreach/briefly&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>bloomreach</category>
      <category>discovery</category>
      <category>saas</category>
    </item>
    <item>
      <title>Discovery | Strategies for Reducing Your Amazon EMR Costs</title>
      <dc:creator>Rory Warin</dc:creator>
      <pubDate>Fri, 09 Jun 2023 13:02:07 +0000</pubDate>
      <link>https://forem.com/bloomreach/discovery-strategies-for-reducing-your-amazon-emr-costs-1on7</link>
      <guid>https://forem.com/bloomreach/discovery-strategies-for-reducing-your-amazon-emr-costs-1on7</guid>
      <description>&lt;p&gt;&lt;strong&gt;&lt;em&gt;Blog written by: Prateek Gupta from Bloomreach, 2015&lt;/em&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Bloomreach has built a personalized discovery platform with applications for organic search, site search, content marketing and merchandizing. Bloomreach ingests data from a variety of sources such as merchant inventory feed, sitefetch data from merchants’ websites and pixel data. The data is collected, parsed, stored and used to match user intent to content on merchants’ websites and to provide merchants with insights into consumer behavior and the performance of products on their sites.&lt;/p&gt;

&lt;h2&gt;
  
  
  Merchant Data
&lt;/h2&gt;

&lt;p&gt;A sample data ingestion flow for merchant data is shown in the figure below. Bloomreach ingests merchant data including crawled merchant pages, merchant feed, and pixel data. There are ETL (extract-transform-load) flows that clean, filter and normalize the data and put it into the product database. Individual applications may use this data to produce derived relations. The product database also supports many applications including the “What’s Hot” application that displays relevant trending products to the user on merchant website.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--c7l4zLLK--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ys7lur0t181k94alqcm2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--c7l4zLLK--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ys7lur0t181k94alqcm2.png" alt="Image description" width="620" height="406"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Below is a sample workflow for personalization:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--d5xvzYhT--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/qmsdpx61wqq8vdrlvnpo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--d5xvzYhT--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/qmsdpx61wqq8vdrlvnpo.png" alt="Image description" width="636" height="373"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;At Bloomreach, we launch 1,500 to 2,000 Amazon EMR clusters and run 6,000 Hadoop jobs every day. As a growing company, we’ve seen our use of Amazon EMR rise dramatically in a short time:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--6et06g1e--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/pgz66n2lplnip2y72z92.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--6et06g1e--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/pgz66n2lplnip2y72z92.png" alt="Image description" width="532" height="332"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Amazon EMR Costs
&lt;/h2&gt;

&lt;p&gt;It is critical that we keep our Amazon EMR costs down as we scale up. To that end, we’ve adopted the following strategies&lt;/p&gt;

&lt;p&gt;1) Use AWS Spot Instances rather than On-Demand Instances whenever possible. Amazon Elastic Cloud Compute (Amazon EC2) Spot Instances are unused Amazon EC2 capacity that you bid on; the price you pay is determined by the supply and demand for Spot Instances. The cost of using Spot Instances can be 80% less than using On-Demand Instances. It’s important to manage Spot Instances because they can be terminated if the Spot market price exceeds your bid price. At Bloomreach, we have written an orchestration system that schedules jobs on Amazon EMR. The system implements a Hartmann pipeline that can run a variety of jobs both locally and on Amazon EMR. It can also detect failures such as Spot Instance termination and reschedule jobs on different clusters as needed.&lt;/p&gt;

&lt;p&gt;2) Create a system that shares clusters among several small jobs rather than launching a separate cluster for every job. Remember, whether your job takes 10 minutes or 60 minutes, you’re paying for an hour of access. If you have four 10-minute jobs, you could share one cluster to do them all and be charged for one hour. Or you could employ one cluster for each and be charged for four hours. Sharing clusters among jobs also allows you to save the time and cost of bootstrapping a new cluster. The time savings alone can be a significant factor for real-time jobs.&lt;/p&gt;

&lt;p&gt;3) Use Amazon EMR tags for cost tracking. Using EMR tags lets you track the cost of your cloud usage by project or by department, which gives you deeper insight into return on investment and provides transparency for budgeting purposes.&lt;/p&gt;

&lt;p&gt;4) Create a lifecycle management system that allows you to track clusters and eliminate idle clusters.&lt;/p&gt;

&lt;p&gt;5) Use the right instance types for your jobs. For example, use c3 instance type for compute-heavy jobs. This can significantly reduce waste and costs based on the scale of your jobs. Below is an algorithm we have found useful for selecting the instance type with the best value for compute capacity based on its Spot price:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
maxCpuPerUnitPrice = 0

optimalInstanceType = null

For each instance_type in (Availability Zone, Region) {

cpuPerUnitPrice = instance.cpuCores/instance.spotPrice

if (maxCpuPerUnitPrice &amp;lt; cpuPerUnitPrice) {

optimalInstanceType = instance_type;

}

}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Incorporating these Amazon EMR strategies can reduce EMR costs, increase efficiency and make a good thing even better.&lt;/p&gt;

</description>
      <category>bloomreach</category>
      <category>discovery</category>
      <category>saas</category>
    </item>
    <item>
      <title>Discovery | Crawling Billions of Pages: Building Large Scale Crawling Cluster (Pt 2)</title>
      <dc:creator>Rory Warin</dc:creator>
      <pubDate>Fri, 09 Jun 2023 12:58:36 +0000</pubDate>
      <link>https://forem.com/bloomreach/discovery-crawling-billions-of-pages-building-large-scale-crawling-cluster-pt-2-320l</link>
      <guid>https://forem.com/bloomreach/discovery-crawling-billions-of-pages-building-large-scale-crawling-cluster-pt-2-320l</guid>
      <description>&lt;p&gt;&lt;strong&gt;&lt;em&gt;Blog written by: Chou-han Yang from Bloomreach, 2015&lt;/em&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Previously in “Crawling Billions of Pages: Building Large Scale Crawling Cluster (Pt 2)” we talked about the way to build an asynchronous fetcher to download raw HTML pages effectively. Now we have to go from a single machine to a cluster of fetchers, therefore, we need a way to synchronize all the fetcher nodes so that they won’t overlap. Essentially we want a queue. The requirements for the queuing system are:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Handle billions of URLs.&lt;/li&gt;
&lt;li&gt;Maintain constant web crawling speed per each domain.&lt;/li&gt;
&lt;li&gt;Dynamically reschedule failed jobs.&lt;/li&gt;
&lt;li&gt;Provide status report for crawling cluster in realtime.&lt;/li&gt;
&lt;li&gt;Support multiple queues with different priorities.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In Bloomreach’s case, we use a single Redis server with 16GB memory, which can handle about 16 billion URLs. It is equivalent to one URL per one byte of memory. What follows is a description of our design process and some of its highlights.&lt;/p&gt;

&lt;h2&gt;
  
  
  Minimize the size of each entry
&lt;/h2&gt;

&lt;p&gt;The first challenge to design a queuing system is to figure out what data you need to put into the queue. Once the data size for each entry is determined, it is a lot easier to estimate the capacity of the queuing system. In our case, if we put all the required URLs into the queue, assuming each URL consumes 512 bytes, our 16GB system would hold only 32 million URLs. This design is definitely not enough to handle billions of URLs.&lt;/p&gt;

&lt;p&gt;So we have to combine multiple URLs into a single run corresponding to a task. In our system, we have 1,000 URLs per run. So, we can use a single path pointing to a S3 (Amazon Simple Storage Service) file containing a list of 1,000 URLs for this task. Effectively, we only need to have 0.5K bytes for 1,000 URLs in memory. Therefore, we should be able to handle 16 billion URLs with just 8GB of memory. Sounds easy, doesn’t it?&lt;/p&gt;

&lt;h2&gt;
  
  
  Transfer of Task Ownership
&lt;/h2&gt;

&lt;p&gt;A typical queuing system allows task entries being pushed into or popped out. Effectively the ownership of a task is transferred from the producer of the task entry to the queue, and then from the queue to the task executor when the task is popped out from the queue.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--23Svm-4V--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/upc6a0risj5s0y07bzo8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--23Svm-4V--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/upc6a0risj5s0y07bzo8.png" alt="Image description" width="710" height="402"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The downside of this naive approach is that if the fetcher node crashes for any reason, the tasks currently running will be lost. There are several ways to avoid this:&lt;/p&gt;

&lt;p&gt;1) Provide task timeout. The queuing system can reclaim the task after certain period of time.&lt;br&gt;
Pros: Clients of the queue don’t need to change.&lt;br&gt;
Cons: Once the task is reclaimed and rescheduled, it may run concurrently with the stale task.&lt;/p&gt;

&lt;p&gt;2) Clients of the queue keep tasks in a file, and restart after crash.&lt;br&gt;
Pros: Queue server doesn’t need to change.&lt;br&gt;
Cons: Clients need to have a persistent database to maintain the running tasks.&lt;/p&gt;

&lt;p&gt;Yet, there is still the need to transfer task ownership between client and server, which makes the system unnecessarily complicated. What we can do instead is keep the ownership of those tasks in the server after they are enqueued. That way the client doesn’t have any ownership of the task at any time. Instead we have a stateless list of tasks at any given time. It is very similar to how our library system works. Image every task is a book in the library and readers are clients who would like to read as much as possible. If we allow readers to ‘check-out’ books, we face the risk that some readers may lose their books. One solution is to never allow readers to ‘check-out’ books, and instead have them read digital versions online. That way, all the content stays in the library and there is no risk of losing books.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--D98eB8H9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kv1x1c5nsv19qknhdpm7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--D98eB8H9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kv1x1c5nsv19qknhdpm7.png" alt="Image description" width="800" height="502"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Each fetcher, the client of the queuing system, can obtain several tasks running in parallel at any moment. Like the library system, a reader is allowed to check out several books at the same time. So internally, the queuing system maintains a list of tasks each client is currently running. From the task list, each client would be able to check what task it should execute without actually looking into the queue. This approach dramatically reduces the complexity of the system because each client doesn’t need to know how the queue is implemented inside the queuing system. We will see how this helps build a more complex scheduling algorithm later.&lt;/p&gt;

&lt;p&gt;As shown above, each client periodically syncs its own task list with the server. New tasks will be started once a client discovers new items on the list. On the other hand, once a task is marked done by a client, the server will remove the task and add a new task to the list.&lt;/p&gt;

&lt;h2&gt;
  
  
  Scheduling Algorithm
&lt;/h2&gt;

&lt;p&gt;Currently, more than 60 percent of global internet traffic consists of requests from crawlers or some type of automated Web discovery system. A typical website with a single node can handle from one request per second to hundreds of requests per second, depending on the scalability and implementation. So web crawling is likely to put loads on the Web servers, therefore, a Web crawler should be polite and crawl each domain at very moderate speed, usually slower than 10 seconds a page. Crawling with extremely high speed could be considered a distributed denial-of-service (DDoS) attack. The perceived DDoS attack will result in your crawler quickly being blacklisted. One straightforward way to control the overall crawling speed is to control the number of fetching jobs running across the whole cluster.&lt;/p&gt;

&lt;p&gt;On the other hand, one-off requests may come for various of reasons. People want to debug a certain set of pages, or more commonly, part of a website changes faster than others. For example, the homepage is usually updated more often than other pages. Therefore, having multiple queues for each domain is very useful in many cases.&lt;/p&gt;

&lt;p&gt;The temptation is to create a priority queue with each task preassigned a priority. That way you can always process the priority tasks first. A priority queue has two issues: the priority value itself in each entry may increase the size of the task in memory. And secondly, high priority tasks will block low priority tasks. This is called starvation in scheduling algorithms. Monitoring the queue is also a challenge since most of the queuing systems use a linked list as the underlying data structure. Therefore, traversing the entire queue to read associated metadata is not practical.&lt;/p&gt;

&lt;p&gt;A better approach is to use a randomized scheduler, for example, weighted fair queuing. With this approach, we can randomly schedule jobs from a group of queues without having different priorities at a job level. The chance of a queue being selected for scheduling is proportional to the priority of the queue.&lt;/p&gt;

&lt;p&gt;A cool thing about this approach is that there is no starvation. Even jobs on queues with very low priority will get scheduled at some point. Reading progress for each queue is now a trivial matter because tasks with different priorities are put in different queues instead of mixing into a single priority queue.&lt;/p&gt;

&lt;h2&gt;
  
  
  Multi-tiered Aggregations
&lt;/h2&gt;

&lt;p&gt;On top of the scheduling and task dispatching, monitoring and showing stats is crucial for production operation. The monitoring server doesn’t have to be on the same node as the queue, but we put them on the same node for simplicity.&lt;/p&gt;

&lt;p&gt;The goal is to show the real-time aggregation for different criteria, such as per queue or per fetcher node. Every second, each node will send back the effective web crawling speed and error rates for each task (set of 1000 URLs). We can selectively aggregate the data and display the final results. If you imagine the input values as a single vector, the whole aggregation process can be written down as a single matrix multiplication. Each element in the matrix will have a value of either zero or one. Zero means the corresponding input value is not selected for aggregation, and one means the value is selected.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--dpSWlGbF--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/z21f8obfngva6q7pvnp0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--dpSWlGbF--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/z21f8obfngva6q7pvnp0.png" alt="Image description" width="397" height="265"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Matrix operations can be optimized with numerical libraries which take advantage of vectorized hardware operations, such as numPy. Instead of hard-coding those operations in a nested loop, we transform the whole aggregation process into a single matrix multiplication and then leverage numPy to perform the operation for us. After that, we can get the aggregated value every second, and then push those values upward to the minute, hour, and daily aggregation.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--4Ac9fryn--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/hqn1dfsp65h0ldcfjlzi.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--4Ac9fryn--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/hqn1dfsp65h0ldcfjlzi.png" alt="Image description" width="701" height="550"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The Web UI can get real-time graph from the server without joining data every second. This approach basically amortizes the expansive joining for a given time period by running a partial join operation every second.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Handling billions of URLs in a single queuing system is definitely achievable. The key takeaway is to have the right architecture to help with data sharding and failure handling. Making every component stateless is essential for scaling out. Our queuing system, which has been running for more than a year, has handled more than 12 billion URLs without any sign of slowing down. The queuing system is a cornerstone for the robustness of our web crawling system.&lt;/p&gt;

</description>
      <category>bloomreach</category>
      <category>discovery</category>
      <category>saas</category>
    </item>
    <item>
      <title>Discovery | Crawling Billions of Pages: Building Large Scale Crawling Cluster (Pt 1)</title>
      <dc:creator>Rory Warin</dc:creator>
      <pubDate>Fri, 09 Jun 2023 12:53:12 +0000</pubDate>
      <link>https://forem.com/bloomreach/discovery-crawling-billions-of-pages-building-large-scale-crawling-cluster-pt-1-4p6</link>
      <guid>https://forem.com/bloomreach/discovery-crawling-billions-of-pages-building-large-scale-crawling-cluster-pt-1-4p6</guid>
      <description>&lt;p&gt;&lt;strong&gt;&lt;em&gt;Blog written by: Chou-han Yang from Bloomreach, 2015&lt;/em&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;At Bloomreach, we are constantly crawling our customers’ websites to ensure their quality and to obtain the information we need to run our marketing applications. It is fairly easy to build a prototype with a few lines of scripts and there are a bunch of open source tools available to do that, such as Apache Nutch. We chose to build our own web crawling cluster at Bloomreach after evaluating several open source options. Our requirements were:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Handle more than billions of pages (in about a week).&lt;/li&gt;
&lt;li&gt;Use Hadoop (with Amazon EMR) to parse those pages efficiently.&lt;/li&gt;
&lt;li&gt;Have constant QPS (query per second) for each website.&lt;/li&gt;
&lt;li&gt;Have multiple task queues per each website with different priorities.&lt;/li&gt;
&lt;li&gt;Handle long latency for slow websites.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;A very typical architecture for web crawling clusters includes three main components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;A fetcher that send HTTP requests and reads content.&lt;/li&gt;
&lt;li&gt;A centralized queuing system for job management and distribution.&lt;/li&gt;
&lt;li&gt;A backend pipeline to parse and post-process pages.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--c1I6JnUK--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/yd1b773w4o8b0bzolqcg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--c1I6JnUK--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/yd1b773w4o8b0bzolqcg.png" alt="Image description" width="800" height="623"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;For part one of this series, I would like to focus on the fetcher that we use to crawl our customers’ pages. I’ll cover other components separately in future posts.&lt;/p&gt;

&lt;h2&gt;
  
  
  First attempt: single process loop
&lt;/h2&gt;

&lt;p&gt;To kick-start our discussion, we will just use simple code snippets to demonstrate a very simple fetcher:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.URL;

import java.util.List;



public class Crawler {



  public static void crawl(List urls) throws IOException {

    for (String urlStr : urls) {

      URL url = new URL(urlStr);



      BufferedReader in = new BufferedReader(new InputStreamReader(url.openStream()));

      processHTML(in);

      in.close();

    }

  }



  public static void processHTML(BufferedReader in) {

    // ...

  }



}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is a very straightforward implementation of the fetcher with several potential scaling issues:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;It uses single thread and only one thread per process. In order to concurrently fetch from more than one website, the fetcher needs multiple processes.&lt;/li&gt;
&lt;li&gt;If a single page takes a long time to process, or even worse, the server times out without any response, the whole process will be stuck.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;As straightforward as it is, this approach won’t go very far before some operational headaches set in. So naturally, a better approach would be to use multiple threads in a single process. Unfortunately, with this system, the memory overhead for each process will quickly consume all your memory space.&lt;/p&gt;

&lt;p&gt;Second attempt: multithreaded HTTP client&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;package scratch;



import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.URL;

import java.util.List;



public class Crawler implements Runnable {



  private String urlStr = null;



  public Crawler(String urlStr) {

    this.urlStr = urlStr;

  }



  public static void crawl(List urls) {

    for (String urlStr : urls) {

      new Thread(new Crawler(urlStr)).run();

    }

  }



  @Override

  public void run() {

    try {

      URL url = new URL(urlStr);



      BufferedReader in = new BufferedReader(new InputStreamReader(url.openStream()));

      processHTML(in);

      in.close();

    } catch (IOException e) {

      // Deal with exception.

    }

  }



  public static void processHTML(BufferedReader in) {

    // ...

  }



}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This process seems more modern and it removes the requirements to run more than one process on a single machine. But the shortcoming that a single page can stop the whole loop remains. Compared to multiple process, multiple thread has better memory efficiency, but it will reach its limit when you are running at least 400 to 500 threads on a quad core machine.&lt;/p&gt;

&lt;h2&gt;
  
  
  Third attempt: asynchronous HTTP (Windows style)
&lt;/h2&gt;

&lt;p&gt;To solve the problem of blocking threads for each website loop, people long ago developed solutions for Windows. An experienced Windows IIS programmer would be very familiar with the event-driven programming paradigm. Coming up with the same code in Java isn’t easy, but it might look something like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;package scratch;



import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.URL;

import java.util.List;



public class Crawler {



  public static void crawl(List urls) {

    for (String urlStr : urls) {

      AsyncHttpClient client = new AsyncHttpClient();

      Response response = client.prepareGet(url).execute(new AsyncHandler&amp;amp;lt;T&amp;amp;gt;() {



        void onThrowable(Throwable t) {

        }



        public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {

          processHTML(bodyPart);

          return STATE.CONTINUE;

        }



        public STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {

          return STATE.CONTINUE;

        }



        public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {

          return STATE.CONTINUE;

        }



        T onCompleted() throws Exception {

          return T;

        }

      });

    }

  }



  public static void processHTML(BufferedReader in) {

    // ...

  }



}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Windows usually uses a single thread to process all events, but you can allow multiple threads by changing the setting of the IIS Web server. The Windows operating system can dispatch different events to different window handlers so you can handle all asynchronous HTTP calls efficiently. For a very long time, people weren’t able to do this on Linux-based operating systems since the underlying socket library contained a potential bottleneck.&lt;/p&gt;

&lt;h2&gt;
  
  
  Fourth attempt: HTTP client with asynchronous I/O
&lt;/h2&gt;

&lt;p&gt;The potential bottleneck has been removed by kernel 2.5.44 with the introduction to epoll system call. This allows a process to monitor a huge number of TCP connections without polling from each connection one-by-one. This also triggered the creation of series non-blocking libraries such as Java NIO.&lt;/p&gt;

&lt;p&gt;Network libraries based on Java NIO have the benefit of easily scaling from a few thousands to tens of thousands of TCP connection per machine. The CPU no longer spends time in a waiting state or context switching between a huge number of threads. Therefore, performance and throughput both increase.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import java.net.*;

import java.util.List;



import org.jboss.netty.bootstrap.ClientBootstrap;

import org.jboss.netty.buffer.ChannelBuffer;

import org.jboss.netty.channel.*;

import org.jboss.netty.handler.codec.http.*;



import static org.jboss.netty.channel.Channels.pipeline;



public class Crawler extends SimpleChannelUpstreamHandler {



  public static void crawl(List urls) throws URISyntaxException {

    for (String urlStr : urls) {

      new Crawler().asyncRead(urlStr);

    }

  }



  public void asyncRead(String urlStr) throws URISyntaxException {

    URI uri = new URI(urlStr);



    // Configure the client.

    ClientBootstrap bootstrap = new ClientBootstrap();



    final SimpleChannelUpstreamHandler handler = this;



    // Set up the event pipeline factory.

    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {



      @Override

      public ChannelPipeline getPipeline() throws Exception {

        ChannelPipeline pipeline = pipeline();

        pipeline.addLast("handler", handler);

        return pipeline;

      }

    });



    // Start the connection attempt.

    ChannelFuture future = bootstrap.connect(new InetSocketAddress(uri.getHost(), uri.getPort()));



    // Wait until the connection attempt succeeds or fails.

    Channel channel = future.awaitUninterruptibly().getChannel();

    if (!future.isSuccess()) {

      future.getCause().printStackTrace();

      bootstrap.releaseExternalResources();

      return;

    }



    // Prepare the HTTP request.

    HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());



    // Send the HTTP request.

    channel.write(request);



    // Wait for the server to close the connection.

    channel.getCloseFuture().awaitUninterruptibly();



    // Shut down executor threads to exit.

    bootstrap.releaseExternalResources();

  }



  @Override

  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

    HttpResponse response = (HttpResponse) e.getMessage();

    processHTML(response.getContent());

  }



  public void processHTML(ChannelBuffer content) {

    // ...

  }



}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We use Netty to build our crawler, not only because it uses Java NIO, but because it also provides a good pipeline abstraction to the network stack. It is easy to insert a handler to HTTPS, compression, or time-out without compromising the code structure.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Based on our stress tests, each node with a quad-core CPU can go up to 600 queries per second, reaching the maximum network bandwidth, with its average HTML of size 400K bytes. With a six-node cluster, we can crawl at 3,600 QPS, which is about 311 million pages a day, or 1.2 billion pages in four days.&lt;/p&gt;

&lt;p&gt;Next time, we will talk about how to store tasks with a very long list of URLs with efficient queuing and scheduling.&lt;/p&gt;

</description>
      <category>bloomreach</category>
      <category>discovery</category>
      <category>saas</category>
    </item>
    <item>
      <title>Discovery | The Evolution of Fault Tolerant Redis Cluster</title>
      <dc:creator>Rory Warin</dc:creator>
      <pubDate>Fri, 09 Jun 2023 12:44:43 +0000</pubDate>
      <link>https://forem.com/bloomreach/discovery-the-evolution-of-fault-tolerant-redis-cluster-2g41</link>
      <guid>https://forem.com/bloomreach/discovery-the-evolution-of-fault-tolerant-redis-cluster-2g41</guid>
      <description>&lt;p&gt;&lt;strong&gt;&lt;em&gt;Blog written by: Hongche Liu &amp;amp; Jurgen Philippaerts from Bloomreach, 2015&lt;/em&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;At Bloomreach, we use Redis, an open source advanced key-value cache and store, which is often referred to as a data structure server since values can contain strings, hashes, lists, sets, sorted sets, bitmaps and hyperloglogs. In one application, we use Redis to store 16 billion URLs in our massive parallel crawlers in. We use Redis to store/compute Cassandra clients’ access rate for rate limiting purpose in another. But this post is focused on yet another particular application — real-time personalization, in which we use Redis to store in-session user activities.&lt;/p&gt;

&lt;p&gt;For this job, fault tolerance is a requirement. Without it, there would be no real-time, in-session personalization. As far as a Redis cluster development is concerned, fault tolerance and scalability have received attention only recently. Some features, like sharding, are not even available as a stable version yet in the main Redis repository. Fortunately, there are some industry solutions to fill the gap. This article covers the operations and administration of fault tolerance and scalability of the Redis cluster architecture used at Bloomreach. Many of the topics are discoverable but scattered around the Web, indicating the high commonality, but wide variety, among many industry applications using Redis.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Failsafe Redis Setup&lt;/strong&gt; - we had a humble beginning.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--YPxA4Sbb--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/oywp1xft35eb51ashcw7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--YPxA4Sbb--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/oywp1xft35eb51ashcw7.png" alt="Image description" width="624" height="320"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The simple setup here suits our need at the time because our data is less than the memory capacity of the instance we are using. If our user in-session data grows out of the current instance’s memory capacity, we will have to migrate the Redis node to a bigger instance. If you are asking, “How the heck can this system scale up?” — good.  You are ahead of the game. We will cover scaling in a later section. Here we will focus on fault tolerance.&lt;/p&gt;

&lt;h2&gt;
  
  
  Fault Tolerance Against Instance Failure
&lt;/h2&gt;

&lt;p&gt;For fault tolerance against instance failure, we take advantage of the DNS failover setup (for example, AWS Route 53 Failover Policy) as in the diagram below.&lt;/p&gt;

&lt;p&gt;We set up two CNAME records in DNS. In each of them, we configure the routing policy to be failover. For Redis1 CNAME, we set up the failover record type to be primary and attach the proper health check. For Redis2 (the hot backup), we set up the failover record type to be secondary and attach the proper health check.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--xo3l1Zcl--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/e0bys0ln67mx63394ncr.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--xo3l1Zcl--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/e0bys0ln67mx63394ncr.png" alt="Image description" width="390" height="305"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this setup, under normal circumstances, DNS for Redis.bloomreach.com (mock name) returns Redis1 (mock name). When the health check for Redis1 detects that it is down, the DNS resolution will point to Redis2 (mock name) automatically. When Redis1 is back, the DNS server will resolve Redis.bloomreach.com back to Redis1 again.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--OY0whSIu--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/afliv7g2tt2qzp7f4xcg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--OY0whSIu--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/afliv7g2tt2qzp7f4xcg.png" alt="Image description" width="534" height="404"&gt;&lt;/a&gt;&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--PciMXJAL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/0apw7vyq3q5z8vp8qzp0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--PciMXJAL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/0apw7vyq3q5z8vp8qzp0.png" alt="Image description" width="543" height="430"&gt;&lt;/a&gt;&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--hhOoWSO3--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kp0co4cfjvtro92956zb.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--hhOoWSO3--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kp0co4cfjvtro92956zb.png" alt="Image description" width="571" height="303"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;T0 is the time when Redis1 goes down.&lt;/p&gt;

&lt;p&gt;TN is the time when the DNS service’s health check determines that Redis1 is down. It is also the time when the DNS resolution will point to Redis2, the backup live one.&lt;/p&gt;

&lt;p&gt;TD is the time when the application’s DNS TTL (Time to Live) elapsed. It is also the time when application will get Redis2 host for the DNS lookup for Redis.bloomreach.com.&lt;/p&gt;

&lt;p&gt;TR is the time when Redis1 comes back.&lt;/p&gt;

&lt;p&gt;Between T0 and TD, the application would try to write or read from Redis1, which would fail.&lt;/p&gt;

&lt;p&gt;So the application down time is&lt;/p&gt;

&lt;p&gt;TD – T0 = health_check_grace_period + DNS_TTL&lt;/p&gt;

&lt;p&gt;Between TD and TR, all the data, say D, go to Redis2, not replicated to Redis1.&lt;/p&gt;

&lt;p&gt;So at TR when DNS points back to Redis1, all the written data D will be non-accessible.&lt;/p&gt;

&lt;p&gt;To prevent the loss, we set up pager alert on Redis1 down, with the instruction to flip the replication from Redis2 to Redis1. Before we tried to automate this manual task. Redis has since come up with a good solution with sentinel in version 2.8, which is what we moved to next. It will be covered in the next section.&lt;/p&gt;
&lt;h2&gt;
  
  
  Fault Prevention Tips
&lt;/h2&gt;

&lt;p&gt;But before we go there, I’d like to cover some topics that prevent faults (instead of tolerating faults):&lt;/p&gt;

&lt;p&gt;1) If you are building a production grade Redis cluster, you must follow the tips on this Redis Administration page. During the pre-production baking stage, I personally encountered an issue “fork() error during background save”. The problem and the solution (setting overcommit on Linux to 1) has been noted here. Various production level issues and troubleshooting tips have been well documented on the Redis Administration page and it’s really helpful to pay attention to the issues listed there.&lt;br&gt;
2) One item not covered on the above page is eviction policy. Without setting it up properly, we have encountered the case of Redis being stuck. Our application uses a Redis cluster mainly as a data store for in-session user activities. During peak traffic time, or season, the amount of data could spike beyond the capacity. The default eviction policy is “noeviction.”&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;maxmemory-policy noeviction

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It means when the memory is full, no new data can be written into Redis, not a behavior we would like. After studying the industry experiences and testing, we settled on the following eviction policy. This is the policy that when memory is full, it evicts the data that is closest to expiring. It is the safest behavior in our application.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;maxmemory-policy volatile-ttl
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;3) Another configuration issue we ran into was with Linux open file ulimit. Every TCP connection to Redis is an open file in Linux. The standard AWS Ubuntu image comes with open file size limit of 1024. When we provisioned more application servers for a stress test in preparation for the holiday season, we encountered  the serious problem of application servers getting stuck in startup phase when they initialized sessions with Redis and the Redis hosts ran out of open file handles. It is particularly difficult to trace the problem correctly to the ulimit setting because restarting Redis (the most intuitive operation) resolves the symptoms temporarily. There are also many other settings (wrong port, authentication setting) that can result in the similar symptoms. The key thing to observe is the error message, “Connection reset by peer.” It is an Linux level error, not Redis. Using lsof command confirmed the connection count.&lt;br&gt;
Do not confuse the Linux open files ulimit with Redis configuration of maxclient.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;maxclients 10000


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Although both must be sufficient for your application’s architecture. There are many resources pointing to the solutions to this problem. We set up an Ubuntu upstart conf with the following line:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;limit nofile 10240 10240


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This helped us pass the stress test. &lt;/p&gt;

&lt;h2&gt;
  
  
  Automatic Failover with Redis Sentinel
&lt;/h2&gt;

&lt;p&gt;The previous section describes a simple fault-tolerant Redis setup that does not handle recovery well. So during the holiday season, we upgraded from the default Ubuntu Redis version 2.2 to 2.8, which has a new Redis component called sentinel, with the distinct feature of automatic recovery during a server fault. The setup is depicted in the following diagram:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--XC6zHnn2--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ggu3434pkkwk0d6fbkp4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--XC6zHnn2--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ggu3434pkkwk0d6fbkp4.png" alt="Image description" width="623" height="368"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this setup, Redis sentinel plays a crucial role in system monitoring and failover situation. It keeps constant watch on the status of the master and of the replication status of each slave.&lt;/p&gt;

&lt;h2&gt;
  
  
  Leader Election
&lt;/h2&gt;

&lt;p&gt;In the case of the master crashing, like in the following diagram, all the surviving sentinels get together and agree on the master being incapacitated and then proceed to vote for the next master, which is the slave with the most up to date replicated data.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--F3Yk4Vvq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/3dwwolab1vqfosr9p1jr.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--F3Yk4Vvq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/3dwwolab1vqfosr9p1jr.png" alt="Image description" width="623" height="375"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The reason we have 4 sentinels installed is for decisive majority vote among 3 when one of the hosts is down. Note that one of the sentinels is actually running on the staging Redis.&lt;/p&gt;

&lt;p&gt;Added to this new architecture is a set of load balancers. In our case, we use HAProxy. Now we are no longer relying on just DNS to send clients to the active Redis master node. Thanks to HAProxy’s health check capabilities, we can now reduce the load on the master Redis node by sending read traffic to all nodes, not just to the master.&lt;/p&gt;

&lt;p&gt;HAProxy has some built in Redis check capabilities, but unfortunately, it only checks to see if a Redis node is healthy. It doesn’t report back if it is a master or slave node. Luckily, it allows you to write your own set of commands into a tcp check. The following configuration snippet allows us to configure a backend pool to be used only for Redis write commands.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
tcp-check send info\ replication\r\n

tcp-check expect string role:master

tcp-check send QUIT\r\n

tcp-check expect string +OK

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Another backend pool that takes Redis read commands can be setup with the built-in Redis check:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;option Redis-check


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;A configuration like this needs one more customization in your application, as you now have two endpoints for your Redis connections, one for reads and one for writes.&lt;/p&gt;

&lt;p&gt;Compared with the first approach we used, this setup is a cabinet model of government, where the cabinet members get together to decide if the executive head is incapacitated and then to take proper actions. For example, selecting the next executive head. The previous model is like the vice president model, where the vice president, in normal situations, does not serve any active purposes, but if the president is out, he/she is the automatic successor.&lt;/p&gt;

&lt;h2&gt;
  
  
  Cross Data Center Serving Infrastructure and Automatic Failover
&lt;/h2&gt;

&lt;p&gt;When we evolve into the multi-region distributed serving infrastructure (e.g. east coast and west coast), we have the following setup:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--yGRbjMw9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/4xspn9tif273fgpq4ln4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--yGRbjMw9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/4xspn9tif273fgpq4ln4.png" alt="Image description" width="623" height="546"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The reason we need to have a separate Redis cluster is for the high SLA (10ms response time average), which cannot be achieved with cross-continent access due to high network latency.&lt;/p&gt;

&lt;p&gt;In this setup, we have event handlers that register user activities in real-time to Redis. Our application server mostly performs read operations on Redis. The SLA on the event handler is not as high as the app server requirement (next page response time is OK), so we can pipe all event handlers’ traffic to the main Redis cluster and then replicate the data to the secondary clusters.&lt;/p&gt;

&lt;p&gt;In this setup, one particular sentinel configuration is worth noting — slave priority. On the secondary cluster, you must set:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;slave-priority = 0


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is because you would never want the new master to be in the secondary cluster. If you did, you would send all events from the primary cluster to this new master, replicating the data from the secondary cluster to the primary, increasing latency unnecessarily.&lt;/p&gt;

&lt;h2&gt;
  
  
  Scalable Redis Using Twemproxy
&lt;/h2&gt;

&lt;p&gt;As our business grows, our data outgrows the capacity of any single node’s memory with acceptable cost. Unfortunately, Redis cluster is still a work in progress. However, others in the industry (more specifically, Twitter) have encountered and solved the data distribution problem in an acceptable way and then open-sourced it. The project is called twemproxy, which we adopted after some study and testing.&lt;/p&gt;

&lt;p&gt;The following diagram depicts our setup. We have the production Redis sharded N ways behind 2 twemproxy servers.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--f7aYMNxg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ke3qxt761hzottwv56yp.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--f7aYMNxg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ke3qxt761hzottwv56yp.png" alt="Image description" width="515" height="406"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this setup, a fault in twemproxy server is handled by DNS setup with health checks. Unlike the first setup we used, the multiple CNAME records but use weighted routing policy with the same weight, so both twemproxy servers can route traffic to Redis, avoiding a single bottleneck while achieving higher availability.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--G2zg01sq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ffvnt7t9ffud757e0igt.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--G2zg01sq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ffvnt7t9ffud757e0igt.png" alt="Image description" width="378" height="363"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;A fault in a Redis master is handled by twemproxy using a configuration called auto_eject_hosts.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;auto_eject_hosts: true


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We followed the liveness recommendation section of the twemproxy document. In short, twemproxy detects Redis host failure with some retries and timeout built-in to avoid false alarms. Once such a situation is detected, it ejects the host from the Redis data distribution pool, using:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;distribution: katema


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The accessibility of data on the surviving nodes is preserved.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Tips for Twemproxy Configuration&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;1) Similar to Fault Prevention Tip 3, remember to set the Linux host’s ulimit for open files to an appropriately high number for your application, now that the application is actually connecting to a twemproxy host instead of Redis hosts.&lt;br&gt;
2) If you, like us, have a staging replication setup similar to the diagram above, here is an important configuration to note. In nutcracker.yml, for each pool of Redis nodes, there is a servers configuration like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;servers:

- 10.34.145.16:6379:1 server1

- 10.32.54.221:6379:1 server2

- 10.65.7.21:6379:1 server3


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the production settings, the Redis servers’ IP addresses, naturally, are different from those of the staging Redis servers’ IP addresses. However, make sure to put in the server names (in the above case, server1, server2, server3), even though the server name is optional. Also make sure the order of the servers correspond to the replication setup. That is, production server1 replicating to staging server1, and so on. The reason for this is that the data distribution hash will be based on the server name when it is provided. Making the production and staging server names and sequence the same ensures that the data distribution is consistent between production and staging. We had started without this configuration and found out only one-third of the data was accessible in staging.&lt;/p&gt;

&lt;h2&gt;
  
  
  Trade-offs of Using Twemproxy vs. Previous Solutions
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Drawbacks of using twemproxy:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The data on the crashed node is not accessible until it comes back into service.&lt;/li&gt;
&lt;li&gt;The data written to the other nodes during the downtime of the crashed node is not accessible after it is revived.&lt;/li&gt;
&lt;li&gt;Only a subset of the Redis commands are supported. We had to rewrite part of our application to make this migration work. For example, we used to use Redis pipelining for higher throughput but it is not supported by twemproxy (I guess, due to the incompatibility between the pipelining intrinsic sequential nature and twemproxy’s parallel nature).&lt;/li&gt;
&lt;li&gt;During the downtime of a production Redis node, say Redis A, staging twemproxy would be out of sync with its Redis cluster because: the staging data is replicated from production Redis nodes directly, not distributed through the staging twemproxy; the production twemproxies distribute data among the live nodes, Redis B and Redis C; the staging twemproxy, not knowing production Redis A is down, seeing all nodes (staging Redis A, B, C) alive, following the original data distribution scheme based on 3 shards, would sometimes go to the staging Redis A, where the replication has already stopped from production Redis A, resulting in a miss.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Gains of using Twemproxy:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Naturally, with this architecture, we are able the scale up our Redis capacity three times. If data grows even more, we can linearly scale up.&lt;/li&gt;
&lt;li&gt;Additionally, in our throughput test, we discovered that twemproxy’s throughput is actually higher than the previous setup, even though there is an extra hop in data transmission. Twemproxy’s own documentation claims that the internal pipelining implementation cuts down the overhead and improves performance.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;To us, the gains are more than the sacrifices. So we continue to use twemproxy.&lt;/p&gt;

&lt;h2&gt;
  
  
  Cross Data Center Scaling
&lt;/h2&gt;

&lt;p&gt;When we scale the multi-region distributed serving infrastructure, we have the following setup:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--NRUdsN_5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/0qfkjw9giiag7a3qo5ac.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--NRUdsN_5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/0qfkjw9giiag7a3qo5ac.png" alt="Image description" width="516" height="531"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this setup, all regions can be scaled up at the same time.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusions
&lt;/h2&gt;

&lt;p&gt;In our evolution of Redis deployment strategy:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;We used DNS failover settings to achieve fault tolerant Redis cluster for our first implementation.&lt;/li&gt;
&lt;li&gt;We used Redis Sentinel for automatic failover for our second version.&lt;/li&gt;
&lt;li&gt;We are using the open source twemproxy package to scale the capacity and the twemproxy automatic failover setting for fault tolerance.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Along this journey, Redis has proven to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Have high performance (high throughput and low latency).&lt;/li&gt;
&lt;li&gt;Contain convenient data structures for diverse application use cases.&lt;/li&gt;
&lt;li&gt;Require a lot of thought and surrounding infrastructure when fault tolerance is a requirement.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;And so you can see our journey has been an interesting one — so far. We’re sharing these tips and pitfalls in the hope that they will help smooth your way in adopting this wonderful technology.&lt;/p&gt;

</description>
      <category>bloomreach</category>
      <category>discovery</category>
      <category>saas</category>
    </item>
    <item>
      <title>Discovery | Introduction to Distributed Solr Components</title>
      <dc:creator>Rory Warin</dc:creator>
      <pubDate>Fri, 09 Jun 2023 12:28:03 +0000</pubDate>
      <link>https://forem.com/bloomreach/discovery-introduction-to-distributed-solr-components-2l4h</link>
      <guid>https://forem.com/bloomreach/discovery-introduction-to-distributed-solr-components-2l4h</guid>
      <description>&lt;p&gt;&lt;strong&gt;&lt;em&gt;Blog written by: Suchi Amalapurapu &amp;amp; Ronak Kothari from Bloomreach, 2015&lt;/em&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;We use Solr to power Bloomreach’s multi-tenant search infrastructure. The multi-tenant search solution caters to diverse requirements/features for tenants belonging to different verticals like apparel, office supplies, flowers &amp;amp; gifts, toys, furniture, housewares, home furnishings, sporting goods, health &amp;amp; beauty. Bloomreach’s search platform provides high-quality search results with a merchandising service that supports a number of different configurations. Hence the need for a number of search features which are implemented as distributed Solr components in SolrCloud.&lt;/p&gt;

&lt;p&gt;This blog goes over how Solr distributed requests work and includes an illustration of a custom autocorrect component design and discusses the various design considerations for implementing distributed search features.&lt;/p&gt;

&lt;h2&gt;
  
  
  Lifecycle of a Solr search query
&lt;/h2&gt;

&lt;p&gt;Search requests in Solr are served via the SearchHandler, which internally invokes a set of callbacks as defined by SearchComponent. These components can be chained to create custom search functionality without actually defining new handlers. A typical search application uses several search components implementing custom search features in each component. Some sample examples are QueryComponent, FacetComponent, MoreLikeThis, Highlighting, Statistics, Debug, QueryElevation.&lt;/p&gt;

&lt;p&gt;The lifecycle of a typical search query in Solr is as follows:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--9O51OB3O--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/m3qdw6ivjfcyzupz4s0z.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--9O51OB3O--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/m3qdw6ivjfcyzupz4s0z.png" alt="Image description" width="203" height="311"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Request Flow
&lt;/h2&gt;

&lt;p&gt;This section goes over the execution flow of requests in SearchHandler and how the callbacks are invoked for SearchComponent. Solr requests are of two types — single shard (non-distributed mode) and multi-shard (distributed mode).&lt;/p&gt;

&lt;h2&gt;
  
  
  Non-distributed or single shard mode
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Architecture&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In Solr’s non-distributed mode (or single shard mode), the index data needed to serve a search request resides on a single shard. The following diagram describes the request flow for this scenario:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--yvaYaWN9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/swfja3xm5oowvv8ffs0v.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--yvaYaWN9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/swfja3xm5oowvv8ffs0v.png" alt="Image description" width="212" height="502"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Implementation details&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;SearchHandler invokes each search component’s prepare methods in a loop followed by each component’s process methods. Components can work on the response of components invoked before itself.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Example – Autocorrect&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Let’s consider the autocorrect feature wherein, the user query is spell-corrected when the query does not have any search results in the index. For example, user query “sheos” is autocorrected to “shoes”, since there are no search results for “sheos”.&lt;/p&gt;

&lt;p&gt;This feature needs three components QueryComponent, SpellcheckComponent and AutocorrectComponent. QueryComponent is used for default search. If the user-defined query does not return any results from the index, SpellcheckComponent adds a spellcheck suggestion. AutocorrectComponent further uses the spellchecked query to return search results. If the user-defined query is spelled correctly, AutocorrectComponent does not modify the response. In this case, SpellcheckComponent and AutocorrectComponent are added as last components. The following are the key steps involved:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;A search request, response builder objects are created in SolrDispatcher.&lt;/li&gt;
&lt;li&gt;A distributed search request is initiated for query “sheos.”&lt;/li&gt;
&lt;li&gt;QueryComponent returns 0 numResults for the query.&lt;/li&gt;
&lt;li&gt;SpellcheckComponent, which gets invoked after QueryComponent, adds a spell-corrected suggestion, “shoes.”&lt;/li&gt;
&lt;li&gt;Based on numResults being 0 and SpellcheckComponent’s suggestion, AutocorrectComponent reissues the search request with corrected query “shoes”&lt;/li&gt;
&lt;li&gt;Response formatting.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Distributed or multi-shard mode
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Architecture&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;A request is considered distributed (or multi-shard) when the index of a collection is partitioned into multiple sub-indexes called shards. The request could hit any node in the cluster. This node needn’t necessarily have data for the intended search request. The node that received the request executes the distributed request in multiple stages and on different shards that contain the actual index. The responses from each of these shards is further merged to get the final response for the query.&lt;/p&gt;

&lt;p&gt;SearchComponent callbacks for a distributed search request differ from its non-distributed counterpart. A search request gets executed by the SearchHandler, which is distributed aware. The callback distributedProcess is used to determine the search component’s next stage across all search components. Search components have the ability to influence the next stage of a distributed request via this callback. Search components can spawn more requests in each stage and all these requests get added to a pending queue. Eventually non-distributed requests get spawned for each of these pending distributed requests, which in turn get executed on each shard. These responses are collated in handleResponses callback. Any post processing work can be further done in finishStage. Please refer to WritingDistributedSearchComponents for a detailed understanding of how distributed requests work.&lt;/p&gt;

&lt;p&gt;The life cycle of a distributed Solr search request can be depicted as follows:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--gexEKW4S--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/wqy8tuea0rjq0lyqg5xo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--gexEKW4S--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/wqy8tuea0rjq0lyqg5xo.png" alt="Image description" width="630" height="836"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Example – Autocorrect&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Let’s consider the functionality of autocorrect in distributed mode. The difference from its non-distributed counterpart is that the spell-corrected response could come from any shard and might not necessarily be present on each shard. So the autocorrect component that we had defined earlier has to process SpellcheckComponent’s response only after letting it collate the responses from all shards. (SpellcheckComponent in Solr is already distributed aware.)&lt;/p&gt;

&lt;p&gt;Please note that this can be better achieved as two search requests instead of a single complicated request. However, the hypothetical implementation in this section illustrates a custom distributed autocorrect component.&lt;/p&gt;

&lt;p&gt;The new autocorrect component has to be defined after the SpellcheckComponent in the list of search components defined in solrconfig.xml. This is to facilitate stage modification of the request by processing the response of SpellcheckComponent.&lt;/p&gt;

&lt;p&gt;Implementation details&lt;/p&gt;

&lt;p&gt;A new stage called STAGE_AUTOCORRECT is added right after STAGE_GET_FIELDS. This functionality goes in distributedProcess of AutocorrectComponent, which checks if the number of results for the given query are 0 and if SpellcheckComponent provides a suggestion for a corrected query. Autocorrect component modifies the request query to the spell suggestion and resets the stage to STAGE_START.&lt;/p&gt;

&lt;p&gt;The new search request is then executed as a distributed search request. A couple of flags have to be used to avoid getting stuck in an infinite loop here, since we are actually resetting the stage. These steps can be listed as:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;A search request, response builder objects are created in SolrDispatcher.&lt;/li&gt;
&lt;li&gt;A distributed search request is initiated for query “sheos” on any node in the cluster with STAGE_START.&lt;/li&gt;
&lt;li&gt;The request goes smoothly till STAGE_GET_FIELDS, where the next stage is set to STAGE_AUTOCORRECT.&lt;/li&gt;
&lt;li&gt;STAGE_AUTOCORRECT is where the numResults condition and SpellcheckComponent suggestion is checked. If the query needs to be autocorrected, a new user query is set on the request and the stage set back to STAGE_START. Set an additional flag on the response to skip this stage next time.&lt;/li&gt;
&lt;li&gt;Search request goes through all the stages again with the spell-checked query.&lt;/li&gt;
&lt;li&gt;Response formatting.&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Learnings
&lt;/h2&gt;

&lt;p&gt;There are several key takeaways from our experiences with scaling distributed search features in Solr. These can be summarized as:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;System Memory and resources&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Any additional search requests created in custom search components should be closed explicitly to avoid leaking resources. Otherwise, this could cause subtle memory leaks in the system. Many search features that we authored had to be tested thoroughly to avoid performance overheads.&lt;/li&gt;
&lt;li&gt;Solr components that load additional data in the form of external data will either need core reloads or ability to be updated via request handlers. Such components should avoid using locks to avoid performance issues when serving requests.&lt;/li&gt;
&lt;li&gt;The throughput of a multi-sharded Solr cluster is usually lower than a non-sharded one (depending on factors like number of shards and stages of a query).&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Design practices&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Its better to split a complicated search request into multiple requests, rather than adding too many stages in the same request — more so in a multi-tenant architecture, where we can mix and match features, based on customer requirements via request-based params or collection-specific default settings.&lt;/li&gt;
&lt;li&gt;In some cases, moving this functionality out of Solr simplified the component design.&lt;/li&gt;
&lt;li&gt;One more caveat that we realized: The single shard collections hosted on SolrCloud do not follow the distributed request life cycle and instead use the non-distributed callbacks. So in general, Solr components should be designed to work in both modes, so that resharding or merging shards has no effect on search functionality later on.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Solr exposes interfaces to allow users to customize search components. Both the handlers, as well as search components, can be extended for custom functionality. However these components have to be carefully designed in a multi-shard mode to capture the intended functionality. Thorough testing and monitoring for a sustained period of time to detect any subtle leaks or performance degradation is essential in such production systems.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;References&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="http://wiki.apache.org/solr/WritingDistributedSearchComponents"&gt;http://wiki.apache.org/solr/WritingDistributedSearchComponents&lt;/a&gt;&lt;br&gt;
&lt;a href="https://wiki.apache.org/solr/SolrCloud"&gt;https://wiki.apache.org/solr/SolrCloud&lt;/a&gt;&lt;br&gt;
&lt;a href="https://wiki.apache.org/solr/SearchComponent"&gt;https://wiki.apache.org/solr/SearchComponent&lt;/a&gt;&lt;br&gt;
&lt;a href="http://lifelongprogrammer.blogspot.in/2013/05/solr-refcounted-dont-forget-to-close.html"&gt;http://lifelongprogrammer.blogspot.in/2013/05/solr-refcounted-dont-forget-to-close.html&lt;/a&gt;&lt;/p&gt;

</description>
      <category>bloomreach</category>
      <category>discovery</category>
      <category>saas</category>
    </item>
    <item>
      <title>Discovery | Identifying New Product Groupings in E-Commerce</title>
      <dc:creator>Rory Warin</dc:creator>
      <pubDate>Fri, 09 Jun 2023 12:14:42 +0000</pubDate>
      <link>https://forem.com/bloomreach/discovery-identifying-new-product-groupings-in-e-commerce-4kd1</link>
      <guid>https://forem.com/bloomreach/discovery-identifying-new-product-groupings-in-e-commerce-4kd1</guid>
      <description>&lt;p&gt;&lt;strong&gt;&lt;em&gt;Blog written by: Padmini Jaikumar from Bloomreach, 2016&lt;/em&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Despite online shopping’s increasing popularity, finding products that you’re interested in on an e-commerce website can be a tiresome experience. Product groupings exactly matching a user’s intent may not exist; search results can be limited and category pages may be too broad.&lt;/p&gt;

&lt;p&gt;This blog post discusses algorithms developed at Bloomreach that automatically create new product groupings to better aid users in their discovery process. Our algorithms use a combination of site content and user behavior to identify new, interesting product groupings.&lt;/p&gt;

&lt;p&gt;In fact, the algorithms have been used to identify thousands of new groupings for merchants spanning many verticals. These new groupings have led to significant revenue impact for these merchants, leading us to conclude that these new groupings have had a measurable impact on improving users’ product discovery process.&lt;/p&gt;

&lt;h2&gt;
  
  
  Problem Statement
&lt;/h2&gt;

&lt;p&gt;Quickly finding products of interest on an e-commerce site can be a tedious process. There usually aren’t pages that display product groupings that exactly match the user’s intent. Category pages containing the products the user is interested in are usually pretty broad and have limited navigation facets for filtering. Searching on the website can return limited or irrelevant results. &lt;/p&gt;

&lt;p&gt;Fig.1 shows such an example, live from a merchant’s site. In this case, a user is interested in finding cream-colored men’s sweaters. There is no category page exactly matching the user’s intent. Searching on the website using site search for “men’s cream sweaters,” returns only one product (Fig.1a), while there are clearly many more relevant products in the retailer’s inventory (Fig.1b). If a user, dissatisfied with this search result, broadened the search query to “men’s sweaters,” there would be 151 products returned, with the only filtering options being “Price,” Customer Rating” and “Best Selling.”&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--1hbpXQDv--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/uqs9kif4akqqit0xkg8g.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--1hbpXQDv--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/uqs9kif4akqqit0xkg8g.jpg" alt="Image description" width="800" height="460"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Fig 1. The image on the left shows the search results for “men’s cream sweaters”. The image on the right shows products on the site that fit the description, but were not returned in the search results.&lt;/p&gt;

&lt;p&gt;Unfortunately, today this experience may be more the rule than the exception. From the merchant’s standpoint, however, this is not an easy problem to fix. With the huge flux in online inventory, thousands of items are being added and removed every day. These items may be insufficiently or incorrectly tagged by suppliers. Search in the presence of such noisy or missing attributes leads to poor quality, as seen in Fig.1. Products also go in and out of season and fashion, and in and out of stock. Finally, customers express their intent in a myriad of different ways.&lt;/p&gt;

&lt;p&gt;As the number of different intents is practically infinite, manually creating the right content for every intent of every user is next to impossible. To overcome these problems, we have developed novel algorithms that automatically identify product groupings, thereby improving the user navigation experience. These algorithms are discussed in detail in the subsequent sections.&lt;/p&gt;

&lt;h2&gt;
  
  
  Creating New Product Groupings
&lt;/h2&gt;

&lt;p&gt;At Bloomreach, we have developed algorithms to automatically create new focused, user-friendly product groupings to aid navigation. These groupings are refreshed every day to keep up with the flux in online inventory, taking the pain out of manually curating such pages.&lt;/p&gt;

&lt;p&gt;Our algorithm for creating these new groupings has the following novel characteristics:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Understanding Synonyms&lt;/strong&gt; - We determine contextual synonym correlations and use this to tag a product with the different ways users describe it.  This enables products to be part of groupings they would not have been in the past, based on the attributes provided by the merchant. We determine these correlations by analyzing the distribution of the products viewed based on particular user queries.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Content Based Clustering&lt;/strong&gt; - We have developed a rich product-attribute dictionary so that pages can be tagged more richly. We rank attributes in this dictionary based on user demand; and we cluster pages with popular attributes to create new groupings.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;User Behavior Based Clustering&lt;/strong&gt; - We determine sets of products that are often bought together by analyzing co-bought products from user buying patterns. We do this by clustering user sessions and by extracting maximum cliques in a product association graph.&lt;/p&gt;

&lt;p&gt;Fig. 2 shows examples of product groupings automatically created by our algorithms.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--HUmf9e5y--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/4m3c0gqnp4bndkjs5qbm.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--HUmf9e5y--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/4m3c0gqnp4bndkjs5qbm.jpg" alt="Image description" width="800" height="594"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Fig 2. The image on the left shows the grouping created for “Prom sleeveless party dress”. The grouping on the right was created for “Decorative lumbar pillow cover”.&lt;/p&gt;

&lt;p&gt;These algorithms have been used to identify thousands of new product groupings for merchants spanning many verticals, which has had significant revenue impact for these merchants. These groupings increase revenue by improving users’ experience in different parts of their buying journey.  A grouping like “Blue lace dress,” helps users browse a gallery of similar products to easily compare them and perhaps buy one. Other groupings of related products, like “Canon DSLR camera accessories,” help someone starting out on the buying process quickly access the range of accessories to complement a product they might have bought already.&lt;/p&gt;

&lt;p&gt;These groupings are linked from top-level category pages so users can access popular, specific combinations easily. They can also be linked from a specific product page so users can quickly access a gallery of similar products. These pages are also excellent landing pages for targeted campaigns or marketing emails.&lt;/p&gt;

&lt;p&gt;The subsequent sections discuss specific aspects of our algorithms in greater detail.&lt;/p&gt;

&lt;h2&gt;
  
  
  Understanding Synonyms
&lt;/h2&gt;

&lt;p&gt;Users describe products in a myriad of different ways. Fig. 3 illustrates this point, where 500 respondents described the fit of the dress in the figure in 148 unique ways.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--aYuFP3Zg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/xeos741mx9dc7dax8xt8.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--aYuFP3Zg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/xeos741mx9dc7dax8xt8.jpg" alt="Image description" width="800" height="272"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Fig 3. Results of describing the fit of the dress shown in the figure.&lt;/p&gt;

&lt;p&gt;Our goal is to build contextual intent synonym mapping, so that we can automatically tag a product with the different ways users describe it. This allows the product to be a member of new groupings that it could not have been a part of in the past, based on the limited attribute set provided by the merchant. This helps users view synonymous products in the same grouping without having to navigate or search again.&lt;/p&gt;

&lt;p&gt;We built a synonym dictionary as opposed to using resources like WordNet[1], since filtering based on context was hard. For example, we wanted to learn that “heels” is a synonym of “pumps” in the context of shoes, but not in the context of electrical equipment.&lt;/p&gt;

&lt;p&gt;User queries for which similar sets of products were viewed are considered synonymous queries. From synonymous queries, we extract phrases that have been surrounded by the same set of words across many queries. These phrases are then considered contextual, intent-based synonyms.&lt;/p&gt;

&lt;p&gt;Our algorithm is summarized below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;for each distinct query q:

v: set of products viewed

c: number of times each product was viewed

p(q): distribution over v by normalizing using sum of c



for q: QuerySet

    for q1: QuerySet

        if substrings(q,q1) || plurals(q,q1)

            q is not a synonym of q1

        else

            if JSD(p(q), p(q1)) &amp;lt; 0.1

                q is synonym of q1

            else

                q is not a synonym of q1

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here JSD is the Jensen-Shannon divergence which is a measure of the similarity between two probability distributions (Eq.1).  Small values of the Jensen-Shannon divergence indicate that the two distributions are very similar, while large values indicate that they are dissimilar. The Jensen-Shannon divergence is a symmetric and smoothed version of the Kullback-Liebler divergence (represented as D( || ) below), which is a non-symmetric measure of the difference between two probability distributions.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--KxVmThdZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/uwbzgm23awufvr5k3lmp.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--KxVmThdZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/uwbzgm23awufvr5k3lmp.png" alt="Image description" width="301" height="34"&gt;&lt;/a&gt;&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--l9fm5XoW--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/wa645wde8yl0cfmue6lm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--l9fm5XoW--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/wa645wde8yl0cfmue6lm.png" alt="Image description" width="151" height="30"&gt;&lt;/a&gt;&lt;br&gt;
Equation 1: Jensen-Shannon divergence&lt;/p&gt;

&lt;p&gt;The Kullback-Liebler divergence is the expectation of the logarithmic difference between the probabilities P and Q, which for discrete probability distributions is computed as:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--V__6l5MY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/je2goqq4jjdo2ujfm0id.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--V__6l5MY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/je2goqq4jjdo2ujfm0id.png" alt="Image description" width="255" height="38"&gt;&lt;/a&gt;&lt;br&gt;
Equation 2: Kullback-Liebler divergence.&lt;/p&gt;

&lt;p&gt;With the algorithm above, we identify synonymous queries such as:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;toddler holiday pajamas → kids christmas pyjamas&lt;/li&gt;
&lt;li&gt;winter coats → snow jackets, outerwear, parka&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;From this query mapping, we then identify unigram synonyms. For this we look for words that appear in the same context, meaning terms that have been surrounded by the same words many times. This helps us identify synonyms like:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;toddler → kids, children, babies&lt;/li&gt;
&lt;li&gt;pajamas → onesies, sleepwear, nightgown, pjs&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;There are some interesting benefits we receive based on our synonym-extraction algorithm:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;We automatically capture new terms as synonyms, for instance, “gladiator sandals” were identified as synonyms for “strappy upper flats.”&lt;/li&gt;
&lt;li&gt;We identify common misspelling of words.&lt;/li&gt;
&lt;li&gt;We can understand brands better – for example, we learned that “mudd” makes junior clothing, since “mudd clothes” had a product distribution similar to “junior clothes.”&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;These algorithms have helped us build a rich synonym dictionary. Using this dictionary, we enhance the attributes supplied for a product by the merchant with synonyms. As mentioned before, this helps capture the numerous ways users might describe this product and allows the product to be a part of new content groupings.&lt;/p&gt;

&lt;p&gt;As far as we are aware, such an approach for synonym extraction has not been tried before at this scale to help retailers provide better navigation.&lt;/p&gt;

&lt;p&gt;The next section discusses our algorithms to identify new product groupings.&lt;/p&gt;

&lt;h2&gt;
  
  
  Creating New Product Groupings
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Clustering by Page Content&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Say we want to create pages grouping similar products (an example would be “Blue lace dress”), that facilitate easy comparison. There are two challenges here: making sure products are annotated richly and identifying which new groupings to create, since there are many similar product grouping pages we can create, but all of them may not be useful.&lt;/p&gt;

&lt;p&gt;For expanding the set of attributes a product is annotated with, we have created a rich-product-attribute dictionary, which elaborates on the set of attributes that can be associated with a product. This dictionary is built by:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Capturing commonly co-occurring words used to describe the product in the product description corpus, using techniques such as Latent Semantic Analysis [2].&lt;/li&gt;
&lt;li&gt;Capturing common co-occurring terms used along with the product in user queries.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Additionally, synonym expansion is used to annotate products more richly.&lt;/p&gt;

&lt;p&gt;Once products have been annotated with a wider set of attributes, we rank attribute and product combinations based on user demand, which is estimated using search queries on the website. For attribute product combinations that are popular, and have many matching products on the merchant’s site, we create new product groupings clustering these pages.&lt;/p&gt;

&lt;p&gt;Clustering Co-Bought Products&lt;/p&gt;

&lt;p&gt;This section discusses the process to identify complementary products like “Coffee Supplies,” that might group a “Coffee Maker,” “Coffee,” “Coffee Filter,” “Mugs” etc. We create such product groupings based on user buying behavior.&lt;/p&gt;

&lt;p&gt;We define a user session to be the the set of products a user bought over the span of a few days. For each user session, we compute a locality sensitive min hash. A locality sensitive min hash has the property that similar items map to the same “buckets” with high probability. Therefore user sessions with a lot of products in common have a high probability of hashing to the same bucket, while sessions with none or very few products in common will have a low probability. The number of user sessions in a bucket gives an idea of the popularity of that product set among all user sessions.&lt;/p&gt;

&lt;p&gt;Subsequently, we compute the most popular product per user-session group, as the pivot product for that group. Once we have the pivot product, we determine frequently bought products from co-bought information. Finally, ensuring that attributes are shared across this set ensures better quality. All thresholds are manually fine-tuned based on performance.&lt;/p&gt;

&lt;p&gt;Our algorithm is summarized here:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--RRjnY_10--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/yycs79teyenocc7fk30q.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--RRjnY_10--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/yycs79teyenocc7fk30q.png" alt="Image description" width="330" height="742"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Identifying frequently bought sets of products
&lt;/h2&gt;

&lt;p&gt;There are some products that are rarely bought alone. For example, a parent buying a children’s coloring book is more likely to buy a couple of different ones as opposed to a single one. Another example is someone looking to buy a sugar jar. Quite often they are also looking for a flour jar, oil dispenser, salt and pepper shakers etc. These products are not similar products and tend to have a tighter correlation than products in the co-bought category. The co-bought clustering algorithm tended to return a broader set of products than these tighter sets, so we tried something different:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;For each pair of products in the user-session data, compute an association score as Eq. 2:&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--xkyR9J9Y--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/u5hsbt4owz2truge9eds.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--xkyR9J9Y--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/u5hsbt4owz2truge9eds.png" alt="Image description" width="336" height="35"&gt;&lt;/a&gt;&lt;br&gt;
Eq 2: Association score between A and B&lt;/p&gt;

&lt;p&gt;NA and NAB are the number of times product A was bought, and products A and B were bought together, respectively. A high association score indicates that the products are often bought together, while a low score indicates that they are not frequently bought together.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Build an undirected graph linking every two products with an association score higher than a threshold (manually fine-tuned).&lt;/li&gt;
&lt;li&gt;Identify maximum cliques in this graph. A clique is a subset of vertices of an undirected graph, such that there is an edge between every set of vertices. A maximum clique is the largest of all cliques in the graph. Since edges are  based on high association scores between products, a clique in the graph represents a set of products bought frequently together.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Using this algorithm, we were able to identify the examples discussed earlier as frequent sets. For clique computation we use the NetworkX Python library implementation of determining maximum cliques [3].&lt;/p&gt;

&lt;h2&gt;
  
  
  Concluding Remarks
&lt;/h2&gt;

&lt;p&gt;In this blog post we presented work we have done in identifying new, focused and interesting groupings that aid user navigation. Our algorithms create groupings that account for contextual synonyms and group similar and related products. Our work here serves to show that creating a compelling user experience in terms of ease of navigation, while being a hard problem, can lead to significant benefits.&lt;/p&gt;

&lt;h2&gt;
  
  
  References
&lt;/h2&gt;

&lt;p&gt;Scott Deerwester and Susan T. Dumais and George W. Furnas and Thomas K. Landauer and Richard Harshman, Indexing by latent semantic analysis, Journal of the American Society for Information Science, 1990&lt;/p&gt;

&lt;p&gt;&lt;a href="https://wordnet.princeton.edu/"&gt;https://wordnet.princeton.edu/&lt;/a&gt;&lt;br&gt;
&lt;a href="https://networkx.github.io/"&gt;https://networkx.github.io/&lt;/a&gt;&lt;/p&gt;

</description>
      <category>bloomreach</category>
      <category>discovery</category>
      <category>saas</category>
    </item>
    <item>
      <title>Discovery | Global Cassandra on AWS EC2 at Bloomreach</title>
      <dc:creator>Rory Warin</dc:creator>
      <pubDate>Fri, 09 Jun 2023 12:03:31 +0000</pubDate>
      <link>https://forem.com/bloomreach/discovery-global-cassandra-on-aws-ec2-at-bloomreach-l5e</link>
      <guid>https://forem.com/bloomreach/discovery-global-cassandra-on-aws-ec2-at-bloomreach-l5e</guid>
      <description>&lt;p&gt;Blog written by: Jorge Rodriguez from Bloomreach, 2016&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;The Bloomreach platform team, which builds the infrastructure that runs our powerful products, was given a job to do:  create a datastore platform  capable of supporting heterogeneous workloads. This platform would be used across all applications at Bloomreach and implement the following high-level requirements:&lt;/p&gt;

&lt;p&gt;⇒ Support back-end pipelines (mostly map-reduce jobs) performing scans, lookups and writes with varying degree of throughput requirements and SLA’s.&lt;br&gt;
⇒ Support front-end applications, which require data produced by back-end pipelines in real-time to be served at low latencies to locations around the globe.&lt;br&gt;
⇒ Provide a platform that effortlessly scales to hundreds of customers with millions of pages and across millions of users.&lt;/p&gt;
&lt;h2&gt;
  
  
  How we started
&lt;/h2&gt;

&lt;p&gt;We opted for Cassandra for our distributed database system. We felt it best suited our requirements. It features:&lt;/p&gt;

&lt;p&gt;⇒ Linear scalability with a decentralized architecture with no masters.&lt;br&gt;
⇒ Proven fault tolerance on cloud infrastructure.&lt;br&gt;
⇒ Cross-data-center replication technology.&lt;br&gt;
⇒ All our required features are available in the open source version.&lt;/p&gt;

&lt;p&gt;We started with a single Cassandra cluster, which we set up on AWS EC2. For our initial configuration we:&lt;/p&gt;

&lt;p&gt;⇒ Used the SSD storage optimized i2.xlarge instance type.&lt;br&gt;
⇒ Assembled it with a network topology structure of a single back-end data center to support back-end pipelines and multiple front-end data centers to support our low-latency API’s.&lt;br&gt;
⇒ Enabled vnodes for the automated token range management.&lt;br&gt;
⇒ Used the native protocol datastax Java driver and configured our clients with all the goodness of round robin, token aware and data-center aware policy to route clients to the appropriate data center and node.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--U--_fSMZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/vnu82pqv42bv92qytlg2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--U--_fSMZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/vnu82pqv42bv92qytlg2.png" alt="Image description" width="774" height="352"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We considered using separate back-end and front-end clusters but decided against it, mostly due to our requirement for our front-end API’s to reflect changes made by back-end pipelines in real-time. If we wanted this feature using separate clusters, we would have to build the replication and maintain the consistency across the clusters ourselves. The Cassandra cross-data center replication strategy would take care of these things for us. It also greatly simplified the requirement for our data to be made available globally for our front-end applications.&lt;/p&gt;
&lt;h2&gt;
  
  
  Global Availability
&lt;/h2&gt;

&lt;p&gt;Bloomreach is a company with customers around the globe and our customers expect operational excellence, which includes great performance and being highly available to our front-end applications. Launching across a single AWS EC2 instance would only provide us with the same availability SLA of the given AWS region, which is only 99.95%. Additionally, in order to achieve our latency goals, we needed to co-locate our services as close as possible to our customers in order to reduce the network overhead.&lt;/p&gt;

&lt;p&gt;We decided to launch our cluster with front-end data centers across AWS regions in the United States (east and west) and in Europe (Ireland). Our cluster additionally supports expansion into any AWS region. By providing front-end data centers in multiple regions, our services become highly available, as all three of these regions would need to go down in order for us to suffer an outage. Additionally, we can serve from the closest location to our end users.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--Gb2t1uWQ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/p2q2h6ujzvap4qhunbdd.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Gb2t1uWQ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/p2q2h6ujzvap4qhunbdd.png" alt="Image description" width="737" height="557"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Snitch Strategy
&lt;/h2&gt;

&lt;p&gt;Snitches inform Cassandra about the network topology of the cluster. It specifies the IP address of the nodes in the cluster and which data center and racks they belong to.  This information allows requests to be routed efficiently and it allows a Cassandra cluster to distribute replicas by grouping machines into data centers and racks.&lt;/p&gt;

&lt;p&gt;As part of configuring our cluster, we need to consider the appropriate snitch strategy. On AWS EC2 Web services there exists the concept of internal and external IP addresses. Each instance receives one of each. Internal IP’s are only accessible within an AWS region, while external IP’s are accessible to the outside world. Ideally our nodes will communicate over internal IP’s with other nodes within the same region and over external IP’s with nodes in other regions in order to achieve the best performance.&lt;/p&gt;

&lt;p&gt;Cassandra provides an “Ec2MultiRegionSnitch” implementation.   This implementation would always connect over the public IP but detect when a node is in the same AWS EC2 region. Then it would replace the TCP connection with one over the private IP by leveraging the “gossiperStarting()” function of the snitch. This was just what we were looking to do!  However, we quickly realized this was not a good option for us for two reasons:&lt;/p&gt;

&lt;p&gt;1) It makes the assumption that an AWS EC2 region is a data center, which is false in our use case, as our back-end data center lives in the same region as one of our front-end data centers.&lt;/p&gt;

&lt;p&gt;2) It relies on the AWS API’s in order to determine the node’s region and its private IP address from the public one. And if there is one thing I’ve learned from extensively using AWS is that their least reliable component is their EC2 API’s. These are infamous for having outages. In fact, in the past  six months, there have been two such outages while existing EC2 instances were unaffected.&lt;/p&gt;

&lt;p&gt;Of the provided Cassandra snitch strategies, we found that we liked the PropertyFileSnitch the best. The reason is that it is explicit and does not depend on any external service hence it can not go down.  The PropertyFileSnitch configuration specifies the node’s IP, data-center name and rack information in a topology config file.   So we decided to extend this configuration to include the private IP as well. We called it the GlobalPropertyFileSnitch, and our topology file format was as follows for each node:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;public-ip&amp;gt;=&amp;lt;datacenter-name&amp;gt;:&amp;lt;rack-name&amp;gt;:&amp;lt;private-ip&amp;gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We named our data centers according to their function (i.e. us-east-1-frontend, us-west-2-frontend, us-east-1-backend). For the rack name, we used the AWS ec2 region of the instance (i.e. us-east-1e, us-east-1c, us-west-2a etc.). From this information, we could infer the region, so that nodes in different data centers, but the same region, could also connect over private IP.&lt;/p&gt;

&lt;p&gt;Our GlobalPropertyFileSnitch implementation is almost exactly that of the PropertyFileSnitch, with two main modifications:&lt;/p&gt;

&lt;p&gt;1) We modified the file parsing to also parse out the private IP address of the node. This allowed us to avoid using the AWS EC2 API to get this information.&lt;/p&gt;

&lt;p&gt;2) We overrode the “gossiperStarting()” function, as it is done in the Ec2MultiRegionSnitch, and additionally provided our own GlobalReconnectableSnitchHelper in order to base the reconnect decision on the AWS EC2 region and not the data center name as the ReconnectableSnitchHelper does.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Override

   public void gossiperStarting()

   {

       Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(localPrivateAddress));

       Gossiper.instance.register(new GlobalReconnectableSnitchHelper(this, getRegion(localPublicAddress), true));

       super.gossiperStarting();

   }

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Cluster Launched!
&lt;/h2&gt;

&lt;p&gt;It wasn’t long before we had a fully functioning cluster; and we launched it for our first few customers. We had fulfilled our first two goals. Our back-end pipelines were doing their thing and generating this awesome data, which was then served by our front-end API’s at low latencies and delivering great value to our customers. Everyone was happy and we were feeling great about our achievement, so it was thoroughly celebrated.&lt;/p&gt;

&lt;p&gt;The next day we recovered from our celebration slightly hungover and set on to achieve the third goal: We needed to scale our data platform across many customers and applications. We had our minds set on delivering this great value to all of our current and future customers.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Challenges of Scaling
&lt;/h2&gt;

&lt;p&gt;It has been discovered during many engineering projects that scaling can present a different set of challenges. This was especially true since our requirements were so diverse. As we started onboarding more customers onto our new platform, we began to experience the pains of operating a data platform at large scale in a cloud environment.&lt;/p&gt;

&lt;p&gt;In the remaining part of this blog post, I will list the toughest challenges we faced in trying to scale out our data platform. If you want to know how we resolved these challenges though, you’ll have to stay tuned for our upcoming blog posts.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Challenge 1 - Scaling a fixed resource (Cassandra) to support an elastic resource (EMR).&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--okKz7tza--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ey3g9xe0gdw8hfnfqjcd.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--okKz7tza--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/ey3g9xe0gdw8hfnfqjcd.png" alt="Image description" width="717" height="678"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;As we started to run more back-end pipelines against our new cluster across different customers and applications, we found that back-end pipelines tend to generate peak loads. These peaks could expand elastically through launching multiple map-reduce jobs simultaneously. We attempted to control the back-end load, but this is very difficult to do across various dev teams. Even within a single dev team it’s a tough task to control pipelines across a large number of customers to ensure we stay within the capabilities of our Cassandra back-end data center.&lt;/p&gt;

&lt;p&gt;We solved this problem with a one-two punch:&lt;/p&gt;

&lt;p&gt;1) A distributed throttling system. We called it the “rate-limiter.” It supports throttling based on operation (read, write, scan, connect) and supports independent throttling settings per application. In the third installment of our blog series on Cassandra at Bloomreach, we’ll detail how this solution works.&lt;/p&gt;

&lt;p&gt;2) Through rate-limiter we were able to stabilize our cluster.  This provided some relief to our cluster, but it was at a cost. We had converted EMR from an elastic resource to a fixed resource. This meant that in order to scale EMR, we now had to scale Cassandra. In our experience, adding and removing nodes on the cluster had been a difficult task, especially under heavy load. Automatically expanding and contracting the size of the cluster to match the peak loads was not an option. So we created the Elastic Cassandra solution, to be detailed in the fourth and final installment of our blog series.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Challenge 2 - Scaling for the Holidays&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Because Bloomreach currently works primarily with e-commerce enterprises, we can see extreme increases in our capacity during the week of Thanksgiving and Christmas. For example, this year we expect a 10x increase in traffic for our services.&lt;/p&gt;

&lt;p&gt;As previously mentioned, our experience with Cassandra is that expanding and contracting the size of the cluster is a costly operation. In our version of Cassandra and with our dataset, it usually takes us about six hours just to add a single node. Even worse, when removing a node while the cluster is under heavy load, we experience drastic latency spikes causing our SLA’s to get tossed out the window. Additionally, our Cassandra cluster infrastructure is already one of the most expensive systems we run at Bloomreach, so we really wanted to keep costs down.&lt;/p&gt;

&lt;p&gt;This forced us to think long and hard about strategies to increase our capacity while maintaining costs low and minimizing the number of nodes we needed to add. We were able to come up with two strategies, which in combination provided us enough capacity that we will not need to add a single node in order to achieve 10x throughput. In the spirit of the holidays, our next installment of this blog series will be about how we achieved this tall task.&lt;/p&gt;

&lt;h2&gt;
  
  
  More to Come …
&lt;/h2&gt;

&lt;p&gt;During this blog post, we described the strategy we employed and decisions we made while launching a globally available Cassandra cluster. Please stay tuned in the coming weeks for the next three installments of our blog series. The next installment, “Increasing Cassandra Capacity for the Holidays Without Adding Nodes”, will describe how we increased our capacity for 10x read throughput without adding any additional nodes to the cluster.&lt;/p&gt;

&lt;p&gt;The third installment, “Distributed QPS Monitoring and Throttling System,” will describe a system we built in order to improve the stability of our cluster by preventing it from being issued more operations than it can support.&lt;/p&gt;

&lt;p&gt;The fourth and final installment will describe our Elastic Cassandra solution. That’s how we solved the problem of horizontally scaling the number of concurrent map-reduce pipelines that we can run at any given time in order to support the elastic nature of map-reduce to it’s full capacity.&lt;/p&gt;

</description>
      <category>bloomreach</category>
      <category>discovery</category>
      <category>saas</category>
    </item>
    <item>
      <title>Discovery | Increasing Cassandra Capacity for the Holidays Without Adding Nodes</title>
      <dc:creator>Rory Warin</dc:creator>
      <pubDate>Fri, 09 Jun 2023 11:55:29 +0000</pubDate>
      <link>https://forem.com/bloomreach/discovery-increasing-cassandra-capacity-for-the-holidays-without-adding-nodes-3mmk</link>
      <guid>https://forem.com/bloomreach/discovery-increasing-cassandra-capacity-for-the-holidays-without-adding-nodes-3mmk</guid>
      <description>&lt;p&gt;&lt;em&gt;&lt;strong&gt;Blog written by: Jorge Rodriguez from Bloomreach, 2016&lt;/strong&gt;&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;At Bloomreach we have a great site-search product, which is now integrated on a number of high-profile e-commerce websites.  It is called Search, Navigation and Personalization (SNAP).  Beginning the week of Thanksgiving until the end of the year, the peak volume of requests that our SNAP product processes is expected to skyrocket to two or three times the normal workload.  While expanding capacity for many of our API’s by simply adding hardware is a trivial matter, at our Cassandra database layer, this is a non-trivial operation.  As mentioned in the previous post, this is an operation that usually takes six hours for every instance we add.&lt;/p&gt;

&lt;p&gt;For this second instalment of our four-part series, we will discuss how we increased the throughput of our front-end data centers by 4x  for the holiday shopping season, without adding any additional internet nodes.&lt;/p&gt;

&lt;h2&gt;
  
  
  A tough requirement
&lt;/h2&gt;

&lt;p&gt;It was about two months back when we started mapping out our capacity requirements for the holiday season. Some things we took into consideration were:&lt;/p&gt;

&lt;p&gt;⇒ We like our systems to have a peak load of no more than 50% of the known capacity.&lt;br&gt;
⇒ During the holiday season, we expect to receive four times our normal peak workload.&lt;br&gt;
⇒ Our Cassandra infrastructure is already one of the most expensive infrastructures we are running at Bloomreach.&lt;/p&gt;

&lt;p&gt;Given these considerations, solving this problem by brute force would eat away at our profit margins.  As many may know, having healthy profit margins is critical for a startup’s valuation.   Instead, of brute force, we decided to re-evaluate our resources and look for optimizations that would make better usage of what we had already provisioned.&lt;/p&gt;

&lt;h2&gt;
  
  
  Resource utilization
&lt;/h2&gt;

&lt;p&gt;We began by identifying two key factors:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;What is binding our front-end read throughput?&lt;/li&gt;
&lt;li&gt;What are our current front-end resources and how are they utilized?&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The answer to what is binding our front-end read throughput was simple.  We have a hard SLA of 125ms TP95 latencies for 50 key Cassandra lookups, but we like to operate at no more than half of that.  Our servers could technically handle a significantly higher workload than what we consider our capacity, however, once our known capacity is exceeded our latencies suffer.&lt;/p&gt;

&lt;p&gt;We also know that there is a direct correlation between CPU usage on the front-end nodes and the latency. When our front-end nodes exceed 40- to 45-percent CPU utilization, our latencies exceed our hard SLA requirement. Our analysis showed that the CPU utilization was impacted by three main factors:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Replication load from the back end.  As discussed in our previous post, our back-end pipelines generate the data that we serve.  These pipelines write to our back-end data center and immediately upon being written the data is replicated to three front-end nodes (since our replication factor is 3).  This replication load requires resources of course, so as the replication load increases, so do our latencies.&lt;/li&gt;
&lt;li&gt;Time spent doing disk IO operations. When reading data from an SSTable on disk, the processor needs to wait for the file to be read.  Similarly the same applies to writing data to disk.  Of course the use of SSD drives decrease this seek time, but it is nonetheless present.  This is why it’s extremely difficult to run Cassandra on a spinning disk and using SSD drives is highly recommended.&lt;/li&gt;
&lt;li&gt;Read request workload from our API’s.  This one is pretty obvious, as the number of read requests increases, so does the number of CPU cycles required to process those requests.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;We then proceeded to look at our current resources and how they were being utilized.  As previously mentioned, we run on AWS EC2, and we use i2.xlarge instances for our Cassandra internet nodes.  These instances are storage optimized and feature:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;4 High-Frequency Intel Xeon E5-2670 v2 (Ivy Bridge) Processors.&lt;/li&gt;
&lt;li&gt;30GB of RAM.&lt;/li&gt;
&lt;li&gt;800GB SSD drives.&lt;/li&gt;
&lt;li&gt;A steep cost of 7 cents per hour.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For the most part, we were making good use of our resources.  Though there was a clear gap in the memory usage, we were only using 12GB of the 30GB available.  What a waste!  We knew we could do more with what we had; we simply had to figure out how.&lt;/p&gt;

&lt;h2&gt;
  
  
  Key Cassandra Concepts
&lt;/h2&gt;

&lt;p&gt;In order to understand our approach to solving this problem, there are two key Cassandra concepts that should be understood.  I’ve simplified these concepts to the scope required to understand the optimizations that we made.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The first is the Cassandra Token Range&lt;/li&gt;
&lt;li&gt;The second concept is the Cassandra Read Path&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Cassandra Token Range&lt;/p&gt;

&lt;p&gt;Cassandra is a distributed system with a masterless architecture.  As such, it needs a way to distribute the data amongst its internet nodes.&lt;/p&gt;

&lt;p&gt;⇒ Each node in a Cassandra cluster is assigned a token range, which is a range of hashes defined by a partitioner.&lt;br&gt;
⇒ This range is defined by any possible long value.&lt;br&gt;
⇒ So the possible tokens lie between -9223372036854775808 to 9223372036854775807.&lt;br&gt;
⇒ Every key, when run through the partitioner hashes to a value in this range.&lt;/p&gt;

&lt;p&gt;The image below represents an eight node Cassandra cluster.  You can think of the circumference of the circle as the token range and each node is responsible for its chunk of the values.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--MaWq8vAJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/9gsnfzwp806v44oruo9i.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--MaWq8vAJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/9gsnfzwp806v44oruo9i.png" alt="Image description" width="257" height="245"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Please note this is a simplified version of the token range. Since we use virtual nodes, there are actually 8 x 32 token ranges to distribute amongst these internet nodes. But that additional complexity is beyond the scope of required knowledge to understand our approach.&lt;/p&gt;

&lt;h2&gt;
  
  
  Cassandra Read Path
&lt;/h2&gt;

&lt;p&gt;Assuming the same eight-node cluster, let’s consider what happens when a client sends a read request to Cassandra.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--CrGHlBzg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/odefglkbini4uta0cgz0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--CrGHlBzg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/odefglkbini4uta0cgz0.png" alt="Image description" width="493" height="403"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Firstly, the client will pick an internet node to which it will send the read request.  This node will become the “coordinator” for the request.  In this example, node 8 receives the read request for keys A,B,C and D.&lt;/li&gt;
&lt;li&gt;The coordinator node will then run the keys through the partitioner to determine the token value and will send a request to the internet nodes which own the range this token falls under.  So in this case, the coordinator node will:&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;⇒ Send one message to node 7 for key C.&lt;br&gt;
⇒ Send one message to node 4 for key B.&lt;br&gt;
⇒ Send two messages to node 2, for keys A and D.&lt;/p&gt;

&lt;p&gt;1) For each of these messages, the following interactions happen.  Pictured below, the red colored boxes represent memory operation and the dark blue colored boxes represent Disk I/O operation.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--U5u7EtJD--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/qqmjmfp1n31km0pa5wpy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--U5u7EtJD--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/qqmjmfp1n31km0pa5wpy.png" alt="Image description" width="540" height="427"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;2) The MessagingService in the coordinator node will send a message to the internet node which has the required key.  Let’s call it the “owner” node.&lt;/p&gt;

&lt;p&gt;The owner node will then:&lt;/p&gt;

&lt;p&gt;⇒ Look for that particular key in the memtable, which contains recent writes.&lt;br&gt;
⇒ Look for that key in all of the SSTables for the given keyspace and column family.&lt;br&gt;
⇒ Merge all of the results to determine the most recent data for the key.&lt;br&gt;
⇒ Respond to the Coordinator node through the MessagingService.&lt;/p&gt;

&lt;p&gt;3) If any owner node is taking too long to respond, the Coordinator node will make a speculative retry.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;It will continue to wait for a response from the owner node, but at the same time it will send the same request to another internet node, which also has the same data (assuming the replication factor is &amp;gt; 1).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The speculative retry timeout is configurable at the column family level.  It can be configured to:&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;⇒ ALWAYS – Retry reads of all replicas.&lt;br&gt;
⇒ Xpercentile – Retry reads based on the percentile latency.&lt;br&gt;
⇒ Yms – Retry reads after specified milliseconds.&lt;br&gt;
⇒ NONE – Do not retry reads.&lt;br&gt;
⇒ The default value is 99th percentile.  But we discovered early on that this value is not aggressive enough for us, and having one or two nodes experiencing high latencies would cause us to exceed our desired performance.&lt;br&gt;
⇒ We configured our column families to use a 15ms speculative retry.  From our testing, this ensured good performance while not overburdening the nodes.&lt;/p&gt;

&lt;p&gt;Once all of the owner nodes have responded with the required data, the Coordinator node will respond to the client with the results.&lt;/p&gt;

&lt;h2&gt;
  
  
  Targeting Queries
&lt;/h2&gt;

&lt;p&gt;At Bloomreach, we had already developed what we called the Quality of Service API.  This API was designed to make our Cassandra data accessible to some of our applications, though it was not yet the primary source of serving infrastructure for our SNAP application.  &lt;/p&gt;

&lt;p&gt;One of the things we noticed through some experimentation was that the Coordinator nodes were doing a lot of work.  And that much of the CPU usage was related to the coordination work being done.  The storage proxy on the coordinator node had to:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Parse the query and determine which nodes own the data.&lt;/li&gt;
&lt;li&gt;Send one message through the messaging service for each key being received.&lt;/li&gt;
&lt;li&gt;Wait for responses to all of these messages, and potentially do speculative retries.&lt;/li&gt;
&lt;li&gt;Respond to the client with the results.&lt;/li&gt;
&lt;li&gt;Do all of this work with a limited number of available threads.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;We had the idea to reduce this workload by creating a smarter client.  This client would group the keys being requested for a query into the appropriate token range and then send the request directly to a node which owns that range.  The benefits of this approach were:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Reduced the number of keys per individual query.&lt;/li&gt;
&lt;li&gt;Most reads occur locally:
⇒ Avoids sending a message per key to peers in the data center.
⇒ Avoids having to wait for the responses from peers.
⇒ Our network resources were not overly utilized, but there is certainly CPU overhead of all of these network operations.  For a 50-key lookup there are 50 messages to send and await responses from.
⇒ Speculative retries are a rare occurrence.&lt;/li&gt;
&lt;li&gt;Moved much of the sending and receiving of messages to a service where we have significantly more threads to deal with these operations.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--TjCxY4yw--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/p66cnfhei4ihhjeep28c.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--TjCxY4yw--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/p66cnfhei4ihhjeep28c.png" alt="Image description" width="411" height="387"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Additionally, we run our QOS API on internet nodes which are CPU optimized. This makes the cost per CPU core 1/5th of the cost per CPU on our Cassandra nodes. With this change, we observed a 50% increase in our capacity and it was accompanied by a nice boost in performance as well.  While that was not our primary goal, it was certainly a very positive impact.&lt;/p&gt;

&lt;h2&gt;
  
  
  Putting our RAM to work
&lt;/h2&gt;

&lt;p&gt;The second change we made was to make better utilization of our memory to reduce CPU load. For this change, we simply had to leverage a feature of Cassandra which we were not leveraging, the Row Cache.&lt;/p&gt;

&lt;p&gt;Through some calculations, we determined that we could fit 35 percent of our relevant data into memory by allocating 10GB of memory per node to the row cache. Additionally, the cache hit ratio was extremely high at 80 to 85 percent!  This is because the most popular products are the most sought after. As illustrated by the updated MessagingService Diagram below, introducing the row cache avoids having to look on disk for the data in favor of retrieving it from memory.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--KnxK8uQ6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/t0i3hed6v9xb171r1ikl.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--KnxK8uQ6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/t0i3hed6v9xb171r1ikl.png" alt="Image description" width="426" height="394"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Serving the data from memory reduces the CPU impact, which was our primary goal.  It also has the added benefit of being much faster.  So again, we sought to improve our throughput and we got a nice performance boost as well!&lt;/p&gt;

&lt;h2&gt;
  
  
  Results
&lt;/h2&gt;

&lt;p&gt;After we switched our SNAP API’s to retrieve data through our QOS API and enabled the Cassandra row cache, we re-ran our load tests.  And we were extremely pleased with the results.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;We had achieved our goal of four times our existing capacity.&lt;/li&gt;
&lt;li&gt;We had added no additional instances.&lt;/li&gt;
&lt;li&gt;We improved our average latencies by 50 percent, without even trying!
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The only dangerous thing here is that we set a precedent, which we now have to live up to. When we focus on performance improvements, we’ll have to deliver these kinds of results to make our management team happy :).&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;We learned some important lessons through this exercise:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;It is really good practice to monitor all system metrics and to have a very good grasp on your resource utilization.&lt;/li&gt;
&lt;li&gt;Before going off and expanding your infrastructure, have a good understanding of why it is necessary. This means understanding what binds your throughput and considering ways to improve that bottleneck.&lt;/li&gt;
&lt;li&gt;If operating in the cloud, select your instance types carefully and thoughtfully. Understand what drives the cost of the instance and pick instances according to your true requirements. Usually this will take some performance and load-test cycles to get right.&lt;/li&gt;
&lt;/ol&gt;

</description>
      <category>bloomreach</category>
      <category>discovery</category>
      <category>saas</category>
    </item>
  </channel>
</rss>
