Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/neo4j/_async/io/_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ async def fetch_all(self):
messages fetched
"""
detail_count = summary_count = 0
while self.responses:
while not self._closed and self.responses:
response = self.responses[0]
while not response.complete:
detail_delta, summary_delta = await self.fetch_message()
Expand Down
16 changes: 7 additions & 9 deletions src/neo4j/_async/io/_bolt3.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
check_supported_server_product,
CommitResponse,
InitResponse,
ResetResponse,
Response,
)

Expand Down Expand Up @@ -391,17 +392,14 @@ def rollback(self, dehydration_hooks=None, hydration_hooks=None,
dehydration_hooks=dehydration_hooks)

async def reset(self, dehydration_hooks=None, hydration_hooks=None):
""" Add a RESET message to the outgoing queue, send
it and consume all remaining messages.
"""

def fail(metadata):
raise BoltProtocolError("RESET failed %r" % metadata, address=self.unresolved_address)
"""Reset the connection.

Add a RESET message to the outgoing queue, send it and consume all
remaining messages.
"""
log.debug("[#%04X] C: RESET", self.local_port)
self._append(b"\x0F",
response=Response(self, "reset", hydration_hooks,
on_failure=fail),
response = ResetResponse(self, "reset", hydration_hooks)
self._append(b"\x0F", response=response,
dehydration_hooks=dehydration_hooks)
await self.send_all()
await self.fetch_all()
Expand Down
16 changes: 7 additions & 9 deletions src/neo4j/_async/io/_bolt4.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
check_supported_server_product,
CommitResponse,
InitResponse,
ResetResponse,
Response,
)

Expand Down Expand Up @@ -311,17 +312,14 @@ def rollback(self, dehydration_hooks=None, hydration_hooks=None,
dehydration_hooks=dehydration_hooks)

async def reset(self, dehydration_hooks=None, hydration_hooks=None):
""" Add a RESET message to the outgoing queue, send
it and consume all remaining messages.
"""

def fail(metadata):
raise BoltProtocolError("RESET failed %r" % metadata, self.unresolved_address)
"""Reset the connection.

Add a RESET message to the outgoing queue, send it and consume all
remaining messages.
"""
log.debug("[#%04X] C: RESET", self.local_port)
self._append(b"\x0F",
response=Response(self, "reset", hydration_hooks,
on_failure=fail),
response = ResetResponse(self, "reset", hydration_hooks)
self._append(b"\x0F", response=response,
dehydration_hooks=dehydration_hooks)
await self.send_all()
await self.fetch_all()
Expand Down
11 changes: 3 additions & 8 deletions src/neo4j/_async/io/_bolt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
CommitResponse,
InitResponse,
LogonResponse,
ResetResponse,
Response,
)

Expand Down Expand Up @@ -314,15 +315,9 @@ async def reset(self, dehydration_hooks=None, hydration_hooks=None):
Add a RESET message to the outgoing queue, send it and consume all
remaining messages.
"""

def fail(metadata):
raise BoltProtocolError("RESET failed %r" % metadata,
self.unresolved_address)

log.debug("[#%04X] C: RESET", self.local_port)
self._append(b"\x0F",
response=Response(self, "reset", hydration_hooks,
on_failure=fail),
response = ResetResponse(self, "reset", hydration_hooks)
self._append(b"\x0F", response=response,
dehydration_hooks=dehydration_hooks)
await self.send_all()
await self.fetch_all()
Expand Down
20 changes: 20 additions & 0 deletions src/neo4j/_async/io/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,26 @@ async def on_failure(self, metadata):
raise Neo4jError.hydrate(**metadata)


class ResetResponse(Response):
async def _unexpected_message(self, response):
log.warning("[#%04X] _: <CONNECTION> RESET received %s "
"(unexpected response) => dropping connection",
self.connection.local_port, response)
await self.connection.close()

async def on_records(self, records):
await self._unexpected_message("RECORD")

async def on_success(self, metadata):
pass

async def on_failure(self, metadata):
await self._unexpected_message("FAILURE")

async def on_ignored(self, metadata=None):
await self._unexpected_message("IGNORED")


class CommitResponse(Response):
pass

Expand Down
2 changes: 1 addition & 1 deletion src/neo4j/_sync/io/_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ def fetch_all(self):
messages fetched
"""
detail_count = summary_count = 0
while self.responses:
while not self._closed and self.responses:
response = self.responses[0]
while not response.complete:
detail_delta, summary_delta = self.fetch_message()
Expand Down
16 changes: 7 additions & 9 deletions src/neo4j/_sync/io/_bolt3.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
check_supported_server_product,
CommitResponse,
InitResponse,
ResetResponse,
Response,
)

Expand Down Expand Up @@ -391,17 +392,14 @@ def rollback(self, dehydration_hooks=None, hydration_hooks=None,
dehydration_hooks=dehydration_hooks)

def reset(self, dehydration_hooks=None, hydration_hooks=None):
""" Add a RESET message to the outgoing queue, send
it and consume all remaining messages.
"""

def fail(metadata):
raise BoltProtocolError("RESET failed %r" % metadata, address=self.unresolved_address)
"""Reset the connection.

Add a RESET message to the outgoing queue, send it and consume all
remaining messages.
"""
log.debug("[#%04X] C: RESET", self.local_port)
self._append(b"\x0F",
response=Response(self, "reset", hydration_hooks,
on_failure=fail),
response = ResetResponse(self, "reset", hydration_hooks)
self._append(b"\x0F", response=response,
dehydration_hooks=dehydration_hooks)
self.send_all()
self.fetch_all()
Expand Down
16 changes: 7 additions & 9 deletions src/neo4j/_sync/io/_bolt4.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
check_supported_server_product,
CommitResponse,
InitResponse,
ResetResponse,
Response,
)

Expand Down Expand Up @@ -311,17 +312,14 @@ def rollback(self, dehydration_hooks=None, hydration_hooks=None,
dehydration_hooks=dehydration_hooks)

def reset(self, dehydration_hooks=None, hydration_hooks=None):
""" Add a RESET message to the outgoing queue, send
it and consume all remaining messages.
"""

def fail(metadata):
raise BoltProtocolError("RESET failed %r" % metadata, self.unresolved_address)
"""Reset the connection.

Add a RESET message to the outgoing queue, send it and consume all
remaining messages.
"""
log.debug("[#%04X] C: RESET", self.local_port)
self._append(b"\x0F",
response=Response(self, "reset", hydration_hooks,
on_failure=fail),
response = ResetResponse(self, "reset", hydration_hooks)
self._append(b"\x0F", response=response,
dehydration_hooks=dehydration_hooks)
self.send_all()
self.fetch_all()
Expand Down
11 changes: 3 additions & 8 deletions src/neo4j/_sync/io/_bolt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
CommitResponse,
InitResponse,
LogonResponse,
ResetResponse,
Response,
)

Expand Down Expand Up @@ -314,15 +315,9 @@ def reset(self, dehydration_hooks=None, hydration_hooks=None):
Add a RESET message to the outgoing queue, send it and consume all
remaining messages.
"""

def fail(metadata):
raise BoltProtocolError("RESET failed %r" % metadata,
self.unresolved_address)

log.debug("[#%04X] C: RESET", self.local_port)
self._append(b"\x0F",
response=Response(self, "reset", hydration_hooks,
on_failure=fail),
response = ResetResponse(self, "reset", hydration_hooks)
self._append(b"\x0F", response=response,
dehydration_hooks=dehydration_hooks)
self.send_all()
self.fetch_all()
Expand Down
20 changes: 20 additions & 0 deletions src/neo4j/_sync/io/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,26 @@ def on_failure(self, metadata):
raise Neo4jError.hydrate(**metadata)


class ResetResponse(Response):
def _unexpected_message(self, response):
log.warning("[#%04X] _: <CONNECTION> RESET received %s "
"(unexpected response) => dropping connection",
self.connection.local_port, response)
self.connection.close()

def on_records(self, records):
self._unexpected_message("RECORD")

def on_success(self, metadata):
pass

def on_failure(self, metadata):
self._unexpected_message("FAILURE")

def on_ignored(self, metadata=None):
self._unexpected_message("IGNORED")


class CommitResponse(Response):
pass

Expand Down