Extreme (Elastic)Search

everything is broken, complexity is born

 

borislav.nikolov@booking.com

What is ElasticSearch

Java based distributed RESTful service on top of Lucene indexes.

What is Lucene

information retrieval library based on inverted indexes and boolean model

What are Inverted Indexes

few lines of code are worth a thausand words

my $book = [
        '... perl / ruby benchmark ...',      # 0
        '... perl / python benchmark ...',    # 1
        '... ruby / python benchmark ...',    # 2
        '... perl is awesome ...',            # 3
];

my $inverted = {
        'benchmark' => [ 0, 1, 2 ],
        'perl'      => [ 0, 1, 3 ],
        'ruby'      => [ 0, 2 ],
        'python'    => [ 1, 2 ],
};

# free stats:
my $n_documents      = scalar(@{ $book });
my $n_perl_documents = scalar(@{ $inverted->{perl} });
documents in the inverted index are sorted by document id

perl AND ruby

   my @ids = search($inverted, "perl AND ruby");
   # matching: $book->[0]  "... perl / ruby benchmark ..."

   my @ids = search($inverted, "perl OR ruby");
   # matching: $book->[0], "... perl / ruby benchmark ..."
   #           $book->[2], "... ruby / python benchmark ..."
   #           $book->[1], "... perl / python benchmark ..."
   #           $book->[3], "... perl is awesome ..." // below ruby/python?
   #                                                 // "ruby" is less common than "perl"

again: What is Lucene

  • actively developed
  • open source
  • fast
  • extremely hackable

again: What is ElasticSearch

  • distributed
  • takes care of things like
    • replication
    • sharding
    • routing
  • awesome API
  • cool plugins
  • works out of the box

invent usecase

  • 100_000_000 documents
  • 200 words per document
  • 1000 searches per second
  • zero tolerance for failure
  • 10ms timeout
  • usual query has to score 1_000_000 documents based on relevance

everything in one place

count the even numbers from 0 up to 1_000_000
#include <stdio.h>
#include <sys/time.h>
int main(void) {
    int i,j;
    struct timeval t0,t1;
    gettimeofday(&t0, 0);

    for (i = 0; i < 1000000; i++)
        if (i % 2 == 0)
            j++;

    gettimeofday(&t1, 0);
    printf("%ld\n", ((t1.tv_sec - t0.tv_sec) * 1000000 + t1.tv_usec - t0.tv_usec) / 1000);
}

took: 5ms

split the work to 120 pieces, took: 47us

estimate work per thread

  • split the 100m docs to 120 shards
  • thread has to score on average 8333 (1/120th of 1_000_000)
  • 5 * 24 core boxes can service a query within few ms
  • 24 boxes with 5 cpus will also do the job

search

my @docs = ();
for (1.. (100_000_000 / 120)) { # 833333
    push @docs, { name => ($_ % 10 == 0 ? 'perl ruby' : 'perl') }
}
$engine->index(\@docs);
my $t0 = time();
my $results = search({
    bool => {
        must => [
            { term => { name => "perl"  } },
            { term => { name => "ruby" } }
        ]
    }
});
print "took: @{[ time() - $t0 ]}\n";

5.8ms when every 10th document matches

0.7ms when all matching documents are in a block

//14 times slower than
for(i = 0; i < 9000; i++) { if (i % 2 == 0) j++; }

perl AND ruby

              {
    'perl' => [ 1,   3,   277, 46000, 64973,   78688, ... ], # 876_962 documents
    'ruby' => [ 300, 456, 736, 837,   7278 ... 50000, ... ]  # 51_345 documents
}
          
  1. pick a leading query
  2. leading = 'ruby', at 300
  3. advance 'perl' to 300 -> closest is 46000
  4. advance 'ruby' to 46000 -> closest is 50000
  5. goto 2. with target 50000

show me the code!

"perl AND ruby"

int doNext(int target) throws Exception {
    for(;;) {
        try_again: for(;;) {
            for (int i = 1; i < queries.size(); i++) {
                Primitive q = queries.get(i);
                if (q.docID() < target) {
                    q.advance(target);
                    if (q.docID() > target) {
                        target = q.docID();
                        break try_again; // goto try_again # thank you java
                    }
                }
            }
            return target;
        }
        // try_again:
        target = lead.advance(target);
    }
}
int next() throws Exception {
    return doNext(lead.nextDoc());
}
while ((int doc = scorer.next()) != NO_MORE_DOCS) {
    collect(doc);
}

why does that matter?

  • linear with number of matching documents
  • total number of documents is irrelevant
  • plan capacity
  • know your data, restrucutre if needed
  • plan for stress

Wait, so what is really going on here?

  • 100_000_000 documents
  • sorting by relevance (using complex ranking functions)
  • on the fly
  • within milliseconds
  • using only 5 boxes
  • we dont need to worry about sharding/replication/distributed work, thanks to ElasticSearch

120 shards, scatter/gather

  • application asks random nodeA
  • nodeA will ask one random node for each of the 120 shards
  • will merge the results

we are good to go, except..

  • everything that takes 100% resources is bound to thrash on itself
  • no downtime
  • maintenance
  • monitoring
  • GC

add more boxes

  • + replicas: throughput and resilience
  • + shards: decrease latency

update/upgrade ElasticSearch/java/kernel

always stop shard reallocation first

  • rolling restart
  • cluster restart (hope, 99.9% uptime?)
  • cluster split (capacity?)
  • growing cluster (capacity?)

master

coordinates cluster wide changes, like:

  • addition/removal of nodes
  • create/delete/open/close of indexes
  • shard (re)allocation
  • index metadata

split brain and data loss

  • dedicated master nodes
  • minimum eligible master nodes - (N / 2) + 1
  • zookeeper plugin

monitoring

  • working indexing threads
  • working search threads
  • heap
  • watching from outside is not enough
  • each node should monitor the whole cluster's
    • latency
    • loss
  • open file descriptors
    • segments
  • start with JMX so you can
    profile/thread dump/heapdump with few mouse clicks

warstories

    In my humble experience I have seen(unreproducable):
  • random stop/100% cpu
  • reboot and never start again
  • random crash (panics, jvm death etc)
  • deadlock magic everywhere
  • broken tcp state
  • switch sending traffic only to ODD mac addresses
  • NIC firmware messups
  • everything started works, everything new hangs

mostly due to hardware issues, triggering weird behavior in unpredictable ways

how did we get here?

we just want to store and search

how to store JSON in Lucene

// read [ { "field": "value" }, .. ] from STDIN and store each hash as a Lucene document
// thank you stackoverflow
IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_48,whitespace);
config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
IndexWriter writer = new IndexWriter(new NIOFSDirectory(ROOT),config);
JsonReader json = new JsonReader(new InputStreamReader(System.in, "UTF-8"));
Gson gson = new Gson();
Map<String,String> map = new HashMap<String,String>();

json.beginArray();
while (json.hasNext()) {
    map = (Map<String,String>) gson.fromJson(json, map.getClass());
    Document doc = new Document();
    for (Map.Entry<String,String> entry : map.entrySet())
        doc.add(new Field(entry.getKey(), entry.getValue(),
                Field.Store.YES,
                Field.Index.ANALYZED));
    writer.addDocument(doc);
}
json.endArray();
json.close();

how to search in Lucene

// return top documents matching 'perl AND ruby' as [ { "key":"value" }.. ]
IndexReader reader = DirectoryReader.open(writer,false);
IndexSearcher searcher = new IndexSearcher(reader);
Query q = new BooleanQuery();
q.add(new TermQuery(new Term("name","perl")),BooleanClause.Occur.MUST);
q.add(new TermQuery(new Term("name","ruby")),BooleanClause.Occur.MUST);
TopDocs results = searcher.search(q, null,100);
ScoreDoc[] hits = results.scoreDocs;
List<Map<String,String>> output = new ArrayList<Map<String,String>>();
for(int i = 0; i < hits.length; i++) {
    Document doc = searcher.doc(hits[i].doc);
    Map<String,String> item = new Map<String,String>();
    item.put("_score",hits[i].score);
    for (IndexableField field : doc.getFields())
        item.put(field.name(),field.stringValue());
    output.put(item);
}
reader.close();
return output;

hack your own search

using sun.http or netty or jetty or whatever tty you like, you can easilly hack your own search service

 

simple topN recepie

  • build 5 "shards" each containing 24 Lucene indexes
  • copy "shard" per box
  • get topN from 5 boxes
  • sort { $b->{_score} <=> $a->{_score} } @r
  • profit

and have your very own unexpected problems :)

summary

  • nothing is as easy as it seems
  • everything can and will break
  • just assume it is broken already
  • dont be afraid to hack your way through the forest
  • when it rains, it pours
  • Lucene 4.9
    http://lucene.apache.org/core/4_9_0/core/index.html
  • Lucene in Action
    http://amzn.com/1933988177
  • examples about Lucene's hackability
    https://github.com/jackdoe/drunken-octo-batman/
  • ElasticSearch
    https://github.com/elasticsearch/elasticsearch

thank you