@@ -377,27 +377,50 @@ size_t RedisCommandParser::ParsedArgsSize() {
377377ParseError RedisCommandParser::Consume (butil::IOBuf& buf,
378378 std::vector<butil::StringPiece>* args,
379379 butil::Arena* arena) {
380+ ParseError err = PARSE_OK;
381+ do {
382+ RedisCommandConsumeState state = ConsumeImpl (buf, arena, &err);
383+ if (state == CONSUME_STATE_CONTINUE) {
384+ continue ;
385+ } else if (state == CONSUME_STATE_DONE) {
386+ break ;
387+ } else {
388+ return err;
389+ }
390+ } while (true );
391+
392+ args->swap (_args);
393+ Reset ();
394+ return PARSE_OK;
395+ }
396+
397+ RedisCommandConsumeState RedisCommandParser::ConsumeImpl (butil::IOBuf& buf,
398+ butil::Arena* arena,
399+ ParseError* err) {
380400 const auto pfc = static_cast <const char *>(buf.fetch1 ());
381401 if (pfc == NULL ) {
382- return PARSE_ERROR_NOT_ENOUGH_DATA;
402+ *err = PARSE_ERROR_NOT_ENOUGH_DATA;
403+ return CONSUME_STATE_ERROR;
383404 }
384405 // '*' stands for array "*<size>\r\n<sub-reply1><sub-reply2>..."
385406 if (!_parsing_array && *pfc != ' *' ) {
386407 if (!std::isalpha (static_cast <unsigned char >(*pfc))) {
387- return PARSE_ERROR_TRY_OTHERS;
408+ *err = PARSE_ERROR_TRY_OTHERS;
409+ return CONSUME_STATE_ERROR;
388410 }
389411 const size_t buf_size = buf.size ();
390412 const auto copy_str = static_cast <char *>(arena->allocate (buf_size + 1 ));
391413 buf.copy_to (copy_str, buf_size);
392414 if (*copy_str == ' ' ) {
393- return PARSE_ERROR_ABSOLUTELY_WRONG;
415+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
416+ return CONSUME_STATE_ERROR;
394417 }
395418 copy_str[buf_size] = ' \0 ' ;
396419 const size_t crlf_pos = butil::StringPiece (copy_str, buf_size).find (" \r\n " );
397420 if (crlf_pos == butil::StringPiece::npos) { // not enough data
398- return PARSE_ERROR_NOT_ENOUGH_DATA;
421+ *err = PARSE_ERROR_NOT_ENOUGH_DATA;
422+ return CONSUME_STATE_ERROR;
399423 }
400- args->clear ();
401424 size_t offset = 0 ;
402425 while (offset < crlf_pos && copy_str[offset] != ' ' ) {
403426 ++offset;
@@ -407,11 +430,11 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
407430 for (size_t i = 0 ; i < offset; ++i) {
408431 first_arg[i] = tolower (first_arg[i]);
409432 }
410- args-> push_back (butil::StringPiece (first_arg, offset));
433+ _args. push_back (butil::StringPiece (first_arg, offset));
411434 if (offset == crlf_pos) {
412435 // only one argument, directly return
413436 buf.pop_front (crlf_pos + 2 );
414- return PARSE_OK ;
437+ return CONSUME_STATE_DONE ;
415438 }
416439 size_t arg_start_pos = ++offset;
417440
@@ -422,7 +445,7 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
422445 const auto arg_length = offset - arg_start_pos;
423446 const auto arg = static_cast <char *>(arena->allocate (arg_length));
424447 memcpy (arg, copy_str + arg_start_pos, arg_length);
425- args-> push_back (butil::StringPiece (arg, arg_length));
448+ _args. push_back (butil::StringPiece (arg, arg_length));
426449 arg_start_pos = ++offset;
427450 }
428451
@@ -431,61 +454,69 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
431454 const auto arg_length = crlf_pos - arg_start_pos;
432455 const auto arg = static_cast <char *>(arena->allocate (arg_length));
433456 memcpy (arg, copy_str + arg_start_pos, arg_length);
434- args-> push_back (butil::StringPiece (arg, arg_length));
457+ _args. push_back (butil::StringPiece (arg, arg_length));
435458 }
436459
437460 buf.pop_front (crlf_pos + 2 );
438- return PARSE_OK ;
461+ return CONSUME_STATE_DONE ;
439462 }
440463 // '$' stands for bulk string "$<length>\r\n<string>\r\n"
441464 if (_parsing_array && *pfc != ' $' ) {
442- return PARSE_ERROR_ABSOLUTELY_WRONG;
465+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
466+ return CONSUME_STATE_ERROR;
443467 }
444468 char intbuf[32 ]; // enough for fc + 64-bit decimal + \r\n
445469 const size_t ncopied = buf.copy_to (intbuf, sizeof (intbuf) - 1 );
446470 intbuf[ncopied] = ' \0 ' ;
447471 const size_t crlf_pos = butil::StringPiece (intbuf, ncopied).find (" \r\n " );
448472 if (crlf_pos == butil::StringPiece::npos) { // not enough data
449- return PARSE_ERROR_NOT_ENOUGH_DATA;
473+ *err = PARSE_ERROR_NOT_ENOUGH_DATA;
474+ return CONSUME_STATE_ERROR;
450475 }
451476 char * endptr = NULL ;
452477 int64_t value = strtoll (intbuf + 1 /* skip fc*/ , &endptr, 10 );
453478 if (endptr != intbuf + crlf_pos) {
454479 LOG (ERROR) << ' `' << intbuf + 1 << " ' is not a valid 64-bit decimal" ;
455- return PARSE_ERROR_ABSOLUTELY_WRONG;
480+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
481+ return CONSUME_STATE_ERROR;
456482 }
457483 if (value < 0 ) {
458484 LOG (ERROR) << " Invalid len=" << value << " in redis command" ;
459- return PARSE_ERROR_ABSOLUTELY_WRONG;
485+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
486+ return CONSUME_STATE_ERROR;
460487 }
461488 if (!_parsing_array) {
462489 if (value > (int64_t )(FLAGS_redis_max_allocation_size / sizeof (butil::StringPiece))) {
463- LOG (ERROR) << " command array size exceeds limit! max="
464- << (FLAGS_redis_max_allocation_size / sizeof (butil::StringPiece))
490+ LOG (ERROR) << " command array size exceeds limit! max="
491+ << (FLAGS_redis_max_allocation_size / sizeof (butil::StringPiece))
465492 << " , actually=" << value;
466- return PARSE_ERROR_ABSOLUTELY_WRONG;
493+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
494+ return CONSUME_STATE_ERROR;
467495 }
468496 buf.pop_front (crlf_pos + 2 /* CRLF*/ );
469497 _parsing_array = true ;
470498 _length = value;
471499 _index = 0 ;
472500 _args.resize (value);
473- return Consume (buf, args, arena) ;
501+ return CONSUME_STATE_CONTINUE ;
474502 }
475503 CHECK (_index < _length) << " a complete command has been parsed. "
476- " impl of RedisCommandParser::Parse is buggy" ;
504+ " impl of RedisCommandParser::Parse is buggy" ;
477505 const int64_t len = value; // `value' is length of the string
478506 if (len < 0 ) {
479507 LOG (ERROR) << " string in command is nil!" ;
480- return PARSE_ERROR_ABSOLUTELY_WRONG;
508+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
509+ return CONSUME_STATE_ERROR;
481510 }
482511 if (len > FLAGS_redis_max_allocation_size) {
483- LOG (ERROR) << " command string exceeds max allocation size! max="
512+ LOG (ERROR) << " command string exceeds max allocation size! max="
484513 << FLAGS_redis_max_allocation_size << " , actually=" << len;
485- return PARSE_ERROR_ABSOLUTELY_WRONG;
514+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
515+ return CONSUME_STATE_ERROR;
486516 }
487517 if (buf.size () < crlf_pos + 2 + (size_t )len + 2 /* CRLF*/ ) {
488- return PARSE_ERROR_NOT_ENOUGH_DATA;
518+ *err = PARSE_ERROR_NOT_ENOUGH_DATA;
519+ return CONSUME_STATE_ERROR;
489520 }
490521 buf.pop_front (crlf_pos + 2 /* CRLF*/ );
491522 char * d = (char *)arena->allocate ((len/8 + 1 ) * 8 );
@@ -502,14 +533,13 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
502533 buf.cutn (crlf, sizeof (crlf));
503534 if (crlf[0 ] != ' \r ' || crlf[1 ] != ' \n ' ) {
504535 LOG (ERROR) << " string in command is not ended with CRLF" ;
505- return PARSE_ERROR_ABSOLUTELY_WRONG;
536+ *err = PARSE_ERROR_ABSOLUTELY_WRONG;
537+ return CONSUME_STATE_ERROR;
506538 }
507- if (++_index < _length) {
508- return Consume (buf, args, arena) ;
539+ if (++_index == _length) {
540+ return CONSUME_STATE_DONE ;
509541 }
510- args->swap (_args);
511- Reset ();
512- return PARSE_OK;
542+ return CONSUME_STATE_CONTINUE;
513543}
514544
515545void RedisCommandParser::Reset () {
0 commit comments