import time
import sys
from typing import Iterable, Any, Mapping, Union
from elasticsearch.exceptions import NotFoundError
from elasticsearch.helpers import streaming_bulk, bulk
from . import connections
from .search import Search
[docs]class Exporter:
"""
Base class helper to export stuff to elasticsearch.
Derive from class and define class attributes:
- ``INDEX_NAME``: ``str``
Name of index, might contain a wildcard `*`
- ``MAPPINGS``: ``dict``
The `mapping <https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html>`__
definition for the index.
And optionally override methods:
- :meth:`.transform_document`
Convert a document to elasticsearch.
- :meth:`.get_document_id`
Return a unique id for the elasticsearch document.
- :meth:`.get_document_index`
Return an alternative index name for the document.
"""
# Name of the elasticsearch index where things are exported
INDEX_NAME: str = None
# dict with mapping parameters
MAPPINGS: dict = None
def __init__(
self,
client=None,
index_prefix: str = None,
index_postfix: str = None,
update_index: bool = True,
):
"""
Create a new instance of the exporter.
:param client:
An optional instance of an elasticsearch.Elasticsearch compatible object
If omitted elastipy.connections.get("default") will be used
:param index_prefix: ``str``
Optional string that is put before the class-attribute ``INDEX_NAME``
:param index_postfix: ``str``
Optional string that is put after the class-attribute ``INDEX_NAME``
:param update_index: ``bool``
If ``True``, the elasticsearch index will be created or updated with
the current ``MAPPINGS`` before the first export of a document.
"""
for required_attribute in ("INDEX_NAME", "MAPPINGS"):
if not getattr(self, required_attribute, None):
raise ValueError(f"Need to define class attribute {self.__class__.__name__}.{required_attribute}")
self._client = client
self.index_prefix = index_prefix
self.index_postfix = index_postfix
self._do_update_index = update_index
self._index_updated = dict()
@property
def client(self):
"""
Access to the elasticsearch client.
If none was defined in constructor
then ``elastipy.connections.get("default")`` is returned.
"""
if self._client is None:
self._client = connections.get()
return self._client
[docs] def index_name(self) -> str:
"""
Returns the configured ``index_prefix - INDEX_NAME - index_suffix``
:return: str
"""
name = self.INDEX_NAME
if self.index_prefix:
name = f"{self.index_prefix}-{name}"
if self.index_postfix:
name = f"{name}-{self.index_postfix}"
return name
[docs] def search(self, **kwargs) -> Search:
"""
Return a new ``Search`` object for this index and client.
:return: Search instance
"""
from .search import Search
return Search(index=self.index_name(), client=self._client, **kwargs)
[docs] def get_document_id(self, es_data: Mapping):
"""
Override this to return a single elasticsearch object's id.
:param es_data: ``dict``
Single object as returned by transform_document()
:return: str, int etc..
"""
return None
[docs] def get_document_index(self, es_data: Mapping) -> str:
"""
Override to define an index per document.
The default function returns the result from ``index_name()``
but it's possible to put objects into separate indices.
For example you might define ``INDEX_NAME = "documents-*"``
and ``get_document_index`` might return
.. CODE::
self.index_name().replace("*", es_data["type"]
:param es_data: ``dict``
Single document as returned by transform_document()
:return: str
"""
return self.index_name()
[docs] def update_index(self) -> None:
"""
Create the index or update changes to the mapping.
Can only be called if ``INDEX_NAME`` does not contain a ``'*'``
:return: None
"""
if "*" in self.index_name():
raise ValueError(f"update_index() can not be called for wildcard indices like '{self.index_name()}'")
self._update_index(self.index_name())
[docs] def delete_index(self) -> bool:
"""
Try to delete the index. Ignore if not found.
:return: ``bool``
True if deleted, False otherwise.
If the index name contains a wildcard ``*``,
True is always returned.
"""
from .aggregation.helper import wildcard_match
name = self.index_name()
try:
self.client.indices.delete(index=name)
self._index_updated.pop(self.index_name(), None)
if "*" in name:
for key in list(self._index_updated):
if wildcard_match(key, name):
self._index_updated.pop(key)
return True
except NotFoundError:
return False
[docs] def export_list(
self,
object_list: Iterable[Any],
chunk_size: int = 500,
refresh: bool = False,
verbose: bool = False,
verbose_total: int = None,
file=None,
**kwargs
):
"""
Export a list of objects.
:param object_list: ``sequence of dict``
This can be a list or generator of dictionaries, containing the
objects that should be exported.
:param chunk_size: ``int``
Number of objects per bulk request.
:param refresh: ``bool``
if ``True`` require the immediate refresh of the index
when finished exporting.
:param verbose: ``bool``
If True print some progress to stderr
(using `tqdm <https://pypi.org/project/tqdm/>`__ if present)
:param verbose_total: ``int``
Provide the number of objects for the **verbosity** if
``object_list`` is a generator.
:param file:
Optional string stream to output verbose info, default is ``stderr``.
All other parameters are passed to
`elasticsearch.helpers.bulk <https://elasticsearch-py.readthedocs.io/en/v7.10.1/helpers.html#elasticsearch.helpers.bulk>`__
:return: ``dict``
Response of elasticsearch bulk call.
"""
def bulk_actions():
for object_data in self._verbose_iter(object_list, verbose, verbose_total, file):
es_data_iter = self.transform_document(object_data)
if isinstance(es_data_iter, Mapping):
es_data_iter = [es_data_iter]
for es_data in es_data_iter:
object_id = self.get_document_id(es_data)
index_name = self.get_document_index(es_data)
if index_name not in self._index_updated:
self._update_index(index_name)
action = {
"_index": self.get_document_index(es_data),
"_source": es_data,
}
if object_id is not None:
action["_id"] = object_id
yield action
response = bulk(
client=self.client,
actions=bulk_actions(),
chunk_size=chunk_size,
refresh=refresh,
**kwargs,
)
if verbose:
# TODO: print error status
print(f"{self.__class__.__name__}: exported {response[0]} objects", file=file)
return response
[docs] def get_index_params(self) -> dict:
"""
Returns the complete index parameters.
Override if you need to specialize things.
:return: dict
"""
return {
"mappings": self.MAPPINGS
}
def _update_index(self, name):
try:
self.client.indices.get_mapping(index=name)
self.client.indices.put_mapping(index=name, body=self.MAPPINGS)
self._index_updated[name] = True
return
except NotFoundError:
pass
self.client.indices.create(index=name, body=self.get_index_params())
@classmethod
def _verbose_iter(cls, iter, verbose: bool, count=None, file=None):
if not verbose:
yield from iter
return
if file is None:
file = sys.stderr
# this is just a unittest switch
if verbose != "simple":
try:
import tqdm
yield from tqdm.tqdm(iter, total=count, file=file)
return
except ImportError:
pass
if count is None:
try:
count = len(iter)
except (TypeError, ):
pass
last_time = None
for i, item in enumerate(iter):
ti = time.time()
if last_time is None or ti - last_time >= 1.:
last_time = ti
if count:
print(f"{cls.__name__} {i}/{count}", file=file)
else:
print(f"{cls.__name__} {i}", file=file)
yield item