Yente sometimes interrupts the initial indexing

Hi,

we use Yente in AWS with OpenSearch. Unfortunately, Yente sometimes interrupts during the initial indexing process.

e[2m2025-10-28T14:24:44.168097Ze[0m [e[31me[1merror    e[0m] e[1mIndex update error: peer closed connection without sending complete message body (incomplete chunked read)e[0m [e[0me[1me[34myente.search.indexere[0m]e[0m
Traceback (most recent call last):
  File "/opt/venv/lib/python3.12/site-packages/httpx/_transports/default.py", line 101, in map_httpcore_exceptions
    yield
  File "/opt/venv/lib/python3.12/site-packages/httpx/_transports/default.py", line 271, in __aiter__
    async for part in self._httpcore_stream:
  File "/opt/venv/lib/python3.12/site-packages/httpcore/_async/connection_pool.py", line 407, in __aiter__
    raise exc from None
  File "/opt/venv/lib/python3.12/site-packages/httpcore/_async/connection_pool.py", line 403, in __aiter__
    async for part in self._stream:
  File "/opt/venv/lib/python3.12/site-packages/httpcore/_async/http11.py", line 342, in __aiter__
    raise exc
  File "/opt/venv/lib/python3.12/site-packages/httpcore/_async/http11.py", line 334, in __aiter__
    async for chunk in self._connection._receive_response_body(**kwargs):
  File "/opt/venv/lib/python3.12/site-packages/httpcore/_async/http11.py", line 203, in _receive_response_body
    event = await self._receive_event(timeout=timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.12/site-packages/httpcore/_async/http11.py", line 213, in _receive_event
    with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
    self.gen.throw(value)
  File "/opt/venv/lib/python3.12/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc) from exc
httpcore.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/app/yente/search/indexer.py", line 217, in update_in_thread
    await update_index(force=force)
  File "/app/yente/search/indexer.py", line 208, in update_index
    await index_entities(provider, dataset, force=force)
  File "/app/yente/search/indexer.py", line 163, in index_entities
    raise exc
  File "/app/yente/search/indexer.py", line 146, in index_entities
    await provider.bulk_index(docs)
  File "/app/yente/provider/opensearch.py", line 236, in bulk_index
    await async_bulk(
  File "/opt/venv/lib/python3.12/site-packages/opensearchpy/_async/helpers/actions.py", line 301, in async_bulk
    async for ok, item in async_streaming_bulk(  # type: ignore
  File "/opt/venv/lib/python3.12/site-packages/opensearchpy/_async/helpers/actions.py", line 202, in async_streaming_bulk
    async for bulk_data, bulk_actions in _chunk_actions(
  File "/opt/venv/lib/python3.12/site-packages/opensearchpy/_async/helpers/actions.py", line 70, in _chunk_actions
    async for action, data in actions:
  File "/opt/venv/lib/python3.12/site-packages/opensearchpy/_async/helpers/actions.py", line 199, in map_actions
    async for item in aiter(actions):
  File "/app/yente/search/indexer.py", line 40, in iter_entity_docs
    async for data in updater.load():
  File "/app/yente/data/updater.py", line 115, in load
    async for data in load_json_lines(
  File "/app/yente/data/loader.py", line 99, in load_json_lines
    async for line in stream_http_lines(url, auth_token=auth_token):
  File "/app/yente/data/loader.py", line 69, in stream_http_lines
    async for line in resp.aiter_lines():
  File "/opt/venv/lib/python3.12/site-packages/httpx/_models.py", line 1031, in aiter_lines
    async for text in self.aiter_text():
  File "/opt/venv/lib/python3.12/site-packages/httpx/_models.py", line 1018, in aiter_text
    async for byte_content in self.aiter_bytes():
  File "/opt/venv/lib/python3.12/site-packages/httpx/_models.py", line 997, in aiter_bytes
    async for raw_bytes in self.aiter_raw():
  File "/opt/venv/lib/python3.12/site-packages/httpx/_models.py", line 1055, in aiter_raw
    async for raw_stream_bytes in self.stream:
  File "/opt/venv/lib/python3.12/site-packages/httpx/_client.py", line 176, in __aiter__
    async for chunk in self._stream:
  File "/opt/venv/lib/python3.12/site-packages/httpx/_transports/default.py", line 270, in __aiter__
    with map_httpcore_exceptions():
         ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
    self.gen.throw(value)
  File "/opt/venv/lib/python3.12/site-packages/httpx/_transports/default.py", line 118, in map_httpcore_exceptions
    raise mapped_exc(message) from exc

The indexing failed once at ~ 6.000.000 entities and once at –10.000.000 entities. Unfortunately, the indexing process then has to start all over again. Is there a way to make it more robust?

Our configuration:

  • Yente
    • Version: 4.5.1
  • OpenSearch
    • Version: OpenSearch 2.19
    • Instance type: m7g.medium.search
    • EBS Volume size: 10

Thank you in advance!

Br,

Felix

Hi! We’ve seen a few times, it’s an indication that the network connection is being disrupted (you’re on AWS? Bezos got a network, no?), sometimes because the concurrent indexing is making the connection very very long-lived. One fix might be to enable YENTE_STREAM_LOAD=false in the env: this will download the full data first, then index it. Of course, it means that the indexer pod needs to have enough disk space for the data file (let’s say 5GB to be safe).

Let me know if this helps?

I don’t think that the AWS network is the problem. But we will try it with the YENTE_STREAM_LOAD env var. The third attempt was successful now. Do you know why Yente indexed 6.463.714 entities today? In the last 2 days, the initial indexing has added ~ 12 million entities.

The high entity counts are just an artifact of retries: there’s ca. 4mn entities in the dataset right now - but every time the connections gets reset, the indexer will start from scratch but keep the index sequence intact.

This is harmless (indexing is idempotent), but the count shown in the logs is then weirdly stating the amount of “work done”, not limited by the total size of the dataset.

It works very well with the YENTE_STREAM_LOAD env var. Thank you very much for your support! :slight_smile: