@@ -1065,37 +1065,49 @@ void DcpConnMap::closeAllStreams_UNLOCKED() {
1065
1065
}
1066
1066
1067
1067
void DcpConnMap::disconnect (const void *cookie) {
1068
- LockHolder lh (connsLock);
1069
- disconnect_UNLOCKED (cookie);
1070
- }
1071
-
1072
- void DcpConnMap::disconnect_UNLOCKED (const void *cookie) {
1073
- std::list<connection_t >::iterator iter;
1074
- for (iter = all.begin (); iter != all.end (); ++iter) {
1075
- if ((*iter)->getCookie () == cookie) {
1076
- (*iter)->setDisconnect (true );
1077
- all.erase (iter);
1078
- break ;
1068
+ // Move the connection matching this cookie from the `all` and map_
1069
+ // data structures (under connsLock).
1070
+ connection_t conn;
1071
+ {
1072
+ LockHolder lh (connsLock);
1073
+ std::list<connection_t >::iterator iter;
1074
+ for (iter = all.begin (); iter != all.end (); ++iter) {
1075
+ if ((*iter)->getCookie () == cookie) {
1076
+ (*iter)->setDisconnect (true );
1077
+ all.erase (iter);
1078
+ break ;
1079
+ }
1079
1080
}
1080
- }
1081
-
1082
- std::map<const void *, connection_t >::iterator itr (map_.find (cookie));
1083
- if (itr != map_.end ()) {
1084
- connection_t conn = itr->second ;
1085
- if (conn.get ()) {
1086
- LOG (EXTENSION_LOG_INFO, " %s Removing connection" ,
1087
- conn->logHeader ());
1088
- map_.erase (itr);
1081
+ std::map<const void *, connection_t >::iterator itr (map_.find (cookie));
1082
+ if (itr != map_.end ()) {
1083
+ conn = itr->second ;
1084
+ if (conn.get ()) {
1085
+ LOG (EXTENSION_LOG_INFO, " %s Removing connection" ,
1086
+ conn->logHeader ());
1087
+ map_.erase (itr);
1088
+ }
1089
1089
}
1090
+ }
1090
1091
1092
+ // Note we shutdown the stream *not* under the connsLock; this is
1093
+ // because as part of closing a DcpConsumer stream we need to
1094
+ // acquire PassiveStream::buffer.bufMutex; and that could deadlock
1095
+ // in EventuallyPersistentStore::setVBucketState, via
1096
+ // PassiveStream::processBufferedMessages.
1097
+ if (conn) {
1091
1098
DcpProducer* producer = dynamic_cast <DcpProducer*> (conn.get ());
1092
1099
if (producer) {
1093
1100
producer->closeAllStreams ();
1094
1101
producer->cancelCheckpointProcessorTask ();
1095
1102
} else {
1096
1103
static_cast <DcpConsumer*>(conn.get ())->closeAllStreams ();
1097
1104
}
1105
+ }
1098
1106
1107
+ // Finished disconnecting the stream; add it to the
1108
+ // deadConnections list.
1109
+ if (conn) {
1110
+ LockHolder lh (connsLock);
1099
1111
deadConnections.push_back (conn);
1100
1112
}
1101
1113
}
0 commit comments