Skip to content

Commit 040e69d

Browse files
committed
added check grants for shared cache queries
1 parent ebb7e32 commit 040e69d

File tree

5 files changed

+125
-7
lines changed

5 files changed

+125
-7
lines changed

cache/async_cache.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ type AsyncCache struct {
2020

2121
graceTime time.Duration
2222

23-
MaxPayloadSize config.ByteSize
24-
SharedWithAllUsers bool
23+
MaxPayloadSize config.ByteSize
24+
SharedWithAllUsers bool
25+
CheckGrantsForSharedCache bool
2526
}
2627

2728
func (c *AsyncCache) Close() error {
@@ -109,10 +110,11 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach
109110
maxPayloadSize := cfg.MaxPayloadSize
110111

111112
return &AsyncCache{
112-
Cache: cache,
113-
TransactionRegistry: transaction,
114-
graceTime: graceTime,
115-
MaxPayloadSize: maxPayloadSize,
116-
SharedWithAllUsers: cfg.SharedWithAllUsers,
113+
Cache: cache,
114+
TransactionRegistry: transaction,
115+
graceTime: graceTime,
116+
MaxPayloadSize: maxPayloadSize,
117+
SharedWithAllUsers: cfg.SharedWithAllUsers,
118+
CheckGrantsForSharedCache: cfg.CheckGrantsForSharedCache,
117119
}, nil
118120
}

config/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ max_payload_size: <byte_size>
100100

101101
# Whether a query cached by a user can be used by another user
102102
shared_with_all_users: <bool> | default = false [optional]
103+
104+
# Whether `shared_with_all_users` option is enabled
105+
# check permissions for cached query used by another user
106+
check_grants_for_shared_cache: <bool> | default = false [optional]
103107
```
104108
105109
### <distributed_cache_config>

config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,6 +938,10 @@ type Cache struct {
938938

939939
// Whether a query cached by a user could be used by another user
940940
SharedWithAllUsers bool `yaml:"shared_with_all_users,omitempty"`
941+
942+
// Whether `shared_with_all_users` option is enabled
943+
// check permissions for cached query used by another user
944+
CheckGrantsForSharedCache bool `yaml:"check_grants_for_shared_cache,omitempty"`
941945
}
942946

943947
func (c *Cache) setDefaults() {

io.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ func (rw *statResponseWriter) Write(b []byte) (int, error) {
113113
func (rw *statResponseWriter) WriteHeader(statusCode int) {
114114
// cache statusCode to keep the opportunity to change it in further
115115
rw.statusCode = statusCode
116+
rw.SetStatusCode(statusCode)
116117
}
117118

118119
// CloseNotify implements http.CloseNotifier
@@ -177,3 +178,49 @@ func (crc *cachedReadCloser) String() string {
177178
crc.bLock.Unlock()
178179
return s
179180
}
181+
182+
var _ ResponseWriterWithCode = &checkGrantsResponseWriter{}
183+
184+
type checkGrantsResponseWriter struct {
185+
http.ResponseWriter
186+
187+
statusCode int
188+
}
189+
190+
func (rw *checkGrantsResponseWriter) SetStatusCode(code int) {
191+
rw.statusCode = code
192+
rw.ResponseWriter.WriteHeader(rw.statusCode)
193+
}
194+
195+
func (rw *checkGrantsResponseWriter) StatusCode() int {
196+
if rw.statusCode == 0 {
197+
return http.StatusOK
198+
}
199+
200+
return rw.statusCode
201+
}
202+
203+
func (rw *checkGrantsResponseWriter) WriteHeader(statusCode int) {
204+
// cache statusCode to keep the opportunity to change it in further
205+
rw.statusCode = statusCode
206+
rw.SetStatusCode(statusCode)
207+
}
208+
209+
func (rw *checkGrantsResponseWriter) Write(b []byte) (int, error) {
210+
if rw.statusCode == http.StatusOK {
211+
return 0, nil
212+
}
213+
214+
n, err := rw.ResponseWriter.Write(b)
215+
return n, err
216+
}
217+
218+
// CloseNotify implements http.CloseNotifier
219+
func (rw *checkGrantsResponseWriter) CloseNotify() <-chan bool {
220+
// The rw.ResponseWriter must implement http.CloseNotifier
221+
rwc, ok := rw.ResponseWriter.(http.CloseNotifier)
222+
if !ok {
223+
panic("BUG: the wrapped ResponseWriter must implement http.CloseNotifier")
224+
}
225+
return rwc.CloseNotify()
226+
}

proxy.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,25 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
146146
}
147147

148148
if shouldReturnFromCache {
149+
// if cache shared between all users
150+
// try to check if cached query is allowed for current user
151+
if s.user.cache != nil && s.user.cache.SharedWithAllUsers && s.user.cache.CheckGrantsForSharedCache {
152+
checkReq, checkQuery, _ := s.createCheckGrantsRequest(req)
153+
154+
srwCheck := &checkGrantsResponseWriter{
155+
ResponseWriter: srw.ResponseWriter,
156+
}
157+
158+
rp.proxyRequest(s, srwCheck, srw, checkReq)
159+
160+
if srwCheck.statusCode == http.StatusOK {
161+
log.Debugf("%s: check grants for shared cached query request success; query: %q; Method: %s; URL: %q", s, checkQuery, checkReq.Method, checkReq.URL.String())
162+
} else {
163+
log.Debugf("%s: check grants for shared cached query request failure: non-200 status code %d; query: %q; Method: %s; URL: %q", s, srwCheck.statusCode, checkQuery, checkReq.Method, checkReq.URL.String())
164+
return
165+
}
166+
}
167+
149168
rp.serveFromCache(s, srw, req, origParams, q)
150169
} else {
151170
rp.proxyRequest(s, srw, srw, req)
@@ -925,3 +944,45 @@ func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) {
925944
s.requestPacketSize = len(q)
926945
return s, 0, nil
927946
}
947+
948+
// create a new request based on proxied one
949+
// with query wrapped to fetch result types like:
950+
// 'DESC ({original_query})'
951+
// along with query parsed and analyzed for return types (which is fast)
952+
// ClickHouse check permissions to execute this query for the user
953+
func (s *scope) createCheckGrantsRequest(originalReq *http.Request) (*http.Request, string, error) {
954+
originalQuery := originalReq.URL.Query().Get("query")
955+
checkQuery := fmt.Sprintf("DESC (%s);", strings.TrimRight(originalQuery, ";"))
956+
957+
// Создаем новый URL, копируя оригинальный
958+
newURL := *originalReq.URL
959+
960+
// Парсим Query параметры
961+
queryParams, err := url.ParseQuery(newURL.RawQuery)
962+
if err != nil {
963+
return nil, checkQuery, err
964+
}
965+
966+
// Изменяем или добавляем параметр "test"
967+
queryParams.Set("query", checkQuery)
968+
969+
// Обновляем RawQuery в новом URL
970+
newURL.RawQuery = queryParams.Encode()
971+
972+
// Создаем новый запрос, используя оригинальный в качестве шаблона
973+
req := &http.Request{
974+
Method: originalReq.Method,
975+
URL: &newURL,
976+
Proto: originalReq.Proto,
977+
ProtoMajor: originalReq.ProtoMajor,
978+
ProtoMinor: originalReq.ProtoMinor,
979+
Header: originalReq.Header.Clone(),
980+
Body: originalReq.Body,
981+
Host: originalReq.Host,
982+
ContentLength: originalReq.ContentLength,
983+
Close: originalReq.Close,
984+
TLS: originalReq.TLS,
985+
}
986+
987+
return req, checkQuery, nil
988+
}

0 commit comments

Comments
 (0)