Overview

configuration

By default an elasticsearch host is expected at localhost:9200. There are currently two ways to specify a different connection.

from elasticsearch import Elasticsearch
from elastipy import Search

# Use an explicit Elasticsearch client (or compatible class)
client = Elasticsearch(
    hosts=[{"host": "localhost", "port": 9200}],
    http_auth=("user", "pwd")
)

# create a Search using the specified client
s = Search(index="bla", client=client)

# can also be done later
s = s.client(client)

Check the Elasticsearch API reference for all the parameters.

We can also set a default client at the program start:

from elastipy import connections

connections.set("default", client)

# .. or as parameters, they get converted to an Elasticsearch client
connections.set("default", {"hosts": [{"host": "localhost", "port": 9200}]})

# get a client
connections.get("default")
<Elasticsearch([{'host': 'localhost', 'port': 9200}])>

Different connections can be specified with the alias name:

connections.set("special", {"hosts": [{"host": "special", "port": 1234}]})

s = Search(client="special")
s.get_client()
<Elasticsearch([{'host': 'special', 'port': 1234}])>

aggregations

More details can be found in the tutorial.

# get a search object
s = Search(index="world")

# create an Aggregation class connected to the Search
agg = s.agg_date_histogram(calendar_interval="1w")
# (for date-specific aggregations we can leave out the 'field' parameter
#  it falls back to Search.timestamp_field which is "timestamp" by default)

# submit the whole request
s.execute()

# access the response

list(agg.keys())
['1999-12-27T00:00:00.000Z',
 '2000-01-03T00:00:00.000Z',
 '2000-01-10T00:00:00.000Z',
 '2000-01-17T00:00:00.000Z']
list(agg.values())
[21, 77, 60, 42]

Without a metric these numbers are the document counts.

Above example as a one-liner:

Search(index="world").agg_date_histogram(calendar_interval="1w").execute().to_dict()
{'1999-12-27T00:00:00.000Z': 21,
 '2000-01-03T00:00:00.000Z': 77,
 '2000-01-10T00:00:00.000Z': 60,
 '2000-01-17T00:00:00.000Z': 42}

nested aggregations and metrics

s = Search(index="world")

# the first parameter is the name of the aggregation
#   (if omitted it will be "a0", "a1", aso..)
agg = s \
    .agg_terms("occasion", field="occasion") \
    .agg_rare_terms("rare-excuses", field="excuse", max_doc_count=2) \
    .metric_avg("avg-length", field="conversation_length") \
    .metric_max("max-length", field="conversation_length") \
    .execute()

The rare_terms aggregation is nested into the terms aggregation and the metrics are siblings nested inside rare_terms.

keys(), values(), items() and to_dict() all operate on the current aggregation. For bucket aggregations they typically show the doc_count value.’

agg.to_dict()
{('dinner', 'my mouth is too dry'): 1,
 ('dinner', "i can't reach the spoon"): 2}

The rows(), dict_rows() and dump.table() methods operate on the whole aggregation branch:

list(agg.dict_rows())
[{'occasion': 'dinner',
  'occasion.doc_count': 200,
  'rare-excuses': 'my mouth is too dry',
  'rare-excuses.doc_count': 1,
  'avg-length': 163.0,
  'max-length': 163.0},
 {'occasion': 'dinner',
  'occasion.doc_count': 200,
  'rare-excuses': "i can't reach the spoon",
  'rare-excuses.doc_count': 2,
  'avg-length': 109.5,
  'max-length': 133.0}]
agg.dump.table(colors=False)
occasion │ occasion.doc_count │ rare-excuses            │ rare-excuses.doc_count │ avg-length   │ max-length
─────────┼────────────────────┼─────────────────────────┼────────────────────────┼──────────────┼─────────────
dinner   │ 200                │ my mouth is too dry     │ 1 ██████████▌          │ 163.0 ██████ │ 163.0 ██████
dinner   │ 200                │ i can't reach the spoon │ 2 ████████████████████ │ 109.5 ████   │ 133.0 ████▉

queries

from elastipy import query

s = Search(index="prog-world")

# chaining means AND
s = s \
    .term(field="category", value="programming") \
    .term("usage", "widely-used")

# also can use operators
s = s & (
    query.Term("topic", "yet-another-api")
    | query.Term("topic", "yet-another-operator-overload")
)

# .query() replaces the current query
s = s.query(query.MatchAll())

languages_per_country = s.agg_terms(field="country").agg_terms(field="language").execute()

languages_per_country.to_dict()
{('IT', 'PHP'): 28,
 ('IT', 'Python'): 24,
 ('IT', 'C++'): 21,
 ('ES', 'C++'): 29,
 ('ES', 'Python'): 22,
 ('ES', 'PHP'): 18,
 ('US', 'PHP'): 23,
 ('US', 'Python'): 20,
 ('US', 'C++'): 15}

exporting

There is a small helper to export stuff to elasticsearch.

from elastipy import Exporter

class MyExporter(Exporter):
    INDEX_NAME = "my-index"

    # mapping can be defined here
    # it will be sent to elasticsearch before the first document is exported
    MAPPINGS = {
        "properties": {
            "some_field": {"type": "text"},
        }
    }

count, errors = MyExporter().export_list(a_lot_of_objects)

print(f"expored {count} objects, errors: {errors}")
expored 1000 objects, errors: []

It uses bulk requests and is very fast, supports document transformation and control over id and sub-index of documents.

import datetime

class MyExporter(Exporter):
    INDEX_NAME = "my-index-*"
    MAPPINGS = {
        "properties": {
            "some_field": {"type": "text"},
            "group": {"type": "keyword"},
            "id": {"type": "keyword"},
            "timestamp": {"type": "date"},
        }
    }

    # if each document has a unique id value we can use it
    # as the elasticsearch id as well. That way we do not
    # create documents twice when exporting them again.
    # Their data just gets updated.
    def get_document_id(self, es_data):
        return es_data["id"]

    # we can bucket documents into separate indices
    def get_document_index(self, es_data):
        return self.index_name().replace("*", es_data["group"])

    # here we can adjust or add some data before it gets exported.
    # it's also possible to split the data into several documents
    #   by yielding or returning a list
    def transform_document(self, data):
        data = data.copy()
        data["timestamp"] = datetime.datetime.now()
        return data

MyExporter().export_list(a_lot_of_objects)
(1000, [])

If we are tired enough we can call:

MyExporter().delete_index()
True

This will actually delete all sub-indices because there’s this wildcard * in the INDEX_NAME.