Redis 通訊協(xié)議(RESP)
RESP 協(xié)議
微信公眾號:運維開發(fā)故事,作者:老鄭
Redis 基于 RESP (Redis Serialization Protocal)協(xié)議來完成客戶端和服務端通訊的。RESP 本質(zhì)是一種文本協(xié)議,實現(xiàn)簡單、易于解析。如下表所示:
類型 | 協(xié)議描述 | 實例 |
客戶端和服務端通過 tcp/流式套接字來進行通訊,為了 防止粘包 因此命令或數(shù)據(jù)均以 \r\n (CRLF) 結尾 | +ok\r\n | |
請求 | *<參數(shù)數(shù)量> CR LF <參數(shù)的數(shù)據(jù)> CR LF <參數(shù) N 的字節(jié)數(shù)量 >CR LF | *2\r\n3\r\nget\r\n$13\r\nusername:1234\r\n。 見 callSendCommond -> redis AppendConnadnArgv -> redisFromatCommandArgv |
簡單字符串回復 | 第一個字節(jié)+ | +ok\r\n |
錯誤回復 | 第一個字節(jié)- | -ERR unknown command 'sa' \r\n |
整數(shù)回復 | 第一個字節(jié): | :0\r\n |
批量回復 | 第一個字節(jié)$ | $6\r\nfoobar\r\n 空回復 $-1 |
多條批量 回復 | 第一個字節(jié)* | 5\r\n:1\r\n:2\r\n:3\r\n:4\r\n$6\r\nfoobar\r\n, 空回復 0\r\n |
如果客戶端和服務端在一臺機器上。那么會對通訊協(xié)議進行優(yōu)化,直接走本地回環(huán)
我們可以通過 tcpdump 命令來抓取客戶端和服務端請求、響應的數(shù)據(jù)包, 命令如下:
# linuxtcpdump -i lo part 6379 -Ann# mac tcpdump -i lo0 port 6379 -Ann
我們以一條 `set msg100 1` 這條命令測試一下 ( 我本機是 mac 環(huán)境):
# 客戶端 A127.0.0.1:6379> set msg100 1OK
服務端抓包結果如下所示:
? ~ sudo tcpdump -i lo0 port 6379 -AnnPassword:tcpdump: verbose output suppressed, use -v or -vv for full protocol decodelistening on lo0, link-type NULL (BSD loopback), capture size 262144 bytes21:52:53.447885 IP 127.0.0.1.51645 > 127.0.0.1.6379: Flags [P.], seq 1564111974:1564112006, ack 169183468, win 6272, options [nop,nop,TS val 774447713 ecr 772455554], length 32: RESP "set" "msg100" "1"E..T..@.@...............]:tf........H......)"a...*3$3set$6msg100$1121:52:53.447912 IP 127.0.0.1.6379 > 127.0.0.1.51645: Flags [.], ack 32, win 6376, options [nop,nop,TS val 774447713 ecr 774447713], length 0E..4..@.@..................]:t......(......)"a.)"a21:52:53.528935 IP 127.0.0.1.6379 > 127.0.0.1.51645: Flags [P.], seq 1:6, ack 32, win 6376, options [nop,nop,TS val 774447793 ecr 774447713], length 5: RESP "OK"E..9..@.@..................]:t......-......)"..)"a+OK21:52:53.528966 IP 127.0.0.1.51645 > 127.0.0.1.6379: Flags [.], ack 6, win 6272, options [nop,nop,TS val 774447793 ecr 774447793], length 0E..4..@.@...............]:t.........(......)"..)".
redis-cli 客戶端效果:
客戶端是對顯示結果做了轉(zhuǎn)化,在 redis-cli.c 文件中下面是它的部分源碼
static sds cliFormatReplyTTY(redisReply *r, char *prefix) { sds out = sdsempty(); switch (r->type) { case REDIS_REPLY_ERROR: out = sdscatprintf(out,"(error) %s\n", r->str); break; case REDIS_REPLY_STATUS: out = sdscat(out,r->str); out = sdscat(out,"\n"); break; case REDIS_REPLY_INTEGER: out = sdscatprintf(out,"(integer) %lld\n",r->integer); break; case REDIS_REPLY_DOUBLE: out = sdscatprintf(out,"(double) %s\n",r->str); break; case REDIS_REPLY_STRING: case REDIS_REPLY_VERB: /* If you are producing output for the standard output we want * a more interesting output with quoted characters and so forth, * unless it's a verbatim string type. */ if (r->type == REDIS_REPLY_STRING) { out = sdscatrepr(out,r->str,r->len); out = sdscat(out,"\n"); } else { out = sdscatlen(out,r->str,r->len); out = sdscat(out,"\n"); } break; case REDIS_REPLY_NIL: out = sdscat(out,"(nil)\n"); break; case REDIS_REPLY_BOOL: out = sdscat(out,r->integer ? "(true)\n" : "(false)\n"); break; case REDIS_REPLY_ARRAY: case REDIS_REPLY_MAP: case REDIS_REPLY_SET: case REDIS_REPLY_PUSH: if (r->elements == 0) { if (r->type == REDIS_REPLY_ARRAY) out = sdscat(out,"(empty array)\n"); else if (r->type == REDIS_REPLY_MAP) out = sdscat(out,"(empty hash)\n"); else if (r->type == REDIS_REPLY_SET) out = sdscat(out,"(empty set)\n"); else if (r->type == REDIS_REPLY_PUSH) out = sdscat(out,"(empty push)\n"); else out = sdscat(out,"(empty aggregate type)\n"); } else { unsigned int i, idxlen = 0; char _prefixlen[16]; char _prefixfmt[16]; sds _prefix; sds tmp; /* Calculate chars needed to represent the largest index */ i = r->elements; if (r->type == REDIS_REPLY_MAP) i /= 2; do { idxlen++; i /= 10; } while(i); /* Prefix for nested multi bulks should grow with idxlen+2 spaces */ memset(_prefixlen,' ',idxlen+2); _prefixlen[idxlen+2] = '\0'; _prefix = sdscat(sdsnew(prefix),_prefixlen); /* Setup prefix format for every entry */ char numsep; if (r->type == REDIS_REPLY_SET) numsep = '~'; else if (r->type == REDIS_REPLY_MAP) numsep = '#'; else numsep = ')'; snprintf(_prefixfmt,sizeof(_prefixfmt),"%%s%%%ud%c ",idxlen,numsep); for (i = 0; i < r->elements; i++) { unsigned int human_idx = (r->type == REDIS_REPLY_MAP) ? i/2 : i; human_idx++; /* Make it 1-based. */ /* Don't use the prefix for the first element, as the parent * caller already prepended the index number. */ out = sdscatprintf(out,_prefixfmt,i == 0 ? "" : prefix,human_idx); /* Format the multi bulk entry */ tmp = cliFormatReplyTTY(r->element[i],_prefix); out = sdscatlen(out,tmp,sdslen(tmp)); sdsfree(tmp); /* For maps, format the value as well. */ if (r->type == REDIS_REPLY_MAP) { i++; sdsrange(out,0,-2); out = sdscat(out," => "); tmp = cliFormatReplyTTY(r->element[i],_prefix); out = sdscatlen(out,tmp,sdslen(tmp)); sdsfree(tmp); } } sdsfree(_prefix); } break; default: fprintf(stderr,"Unknown reply type: %d\n", r->type); exit(1); } return out;}
我們也可以使用 nc 命令來替代 redis-cli 命令行:
? ~ sudo nc 127.0.0.1 6379set a a+OKget a$1a
錯誤代碼
- Redis 常見的錯誤代碼定義如下:
#define REDIS_ERR -1#define REDIS_OK 0/* When an error occurs, the err flag in a context is set to hold the type of * error that occurred. REDIS_ERR_IO means there was an I/O error and you * should use the "errno" variable to find out what is wrong. * For other values, the "errstr" field will hold a description. */#define REDIS_ERR_IO 1 /* Error in read or write */#define REDIS_ERR_EOF 3 /* End of file */#define REDIS_ERR_PROTOCOL 4 /* Protocol error */#define REDIS_ERR_OOM 5 /* Out of memory */#define REDIS_ERR_TIMEOUT 6 /* Timed out */#define REDIS_ERR_OTHER 2 /* Everything else... */#define REDIS_REPLY_STRING 1#define REDIS_REPLY_ARRAY 2#define REDIS_REPLY_INTEGER 3#define REDIS_REPLY_NIL 4#define REDIS_REPLY_STATUS 5#define REDIS_REPLY_ERROR 6#define REDIS_REPLY_DOUBLE 7#define REDIS_REPLY_BOOL 8#define REDIS_REPLY_MAP 9#define REDIS_REPLY_SET 10#define REDIS_REPLY_ATTR 11#define REDIS_REPLY_PUSH 12#define REDIS_REPLY_BIGNUM 13#define REDIS_REPLY_VERB 14/* Default max unused reader buffer. */#define REDIS_READER_MAX_BUF (1024*16)/* Default multi-bulk element limit */#define REDIS_READER_MAX_ARRAY_ELEMENTS ((1LL<<32) - 1)
- 字符串錯誤信息 - 共享對象
void createSharedObjects(void) { int j; /* Shared command responses */ shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n")); shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n")); shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n")); shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n")); shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n")); shared.emptyarray = createObject(OBJ_STRING,sdsnew("*0\r\n")); shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n")); shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n")); shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n")); shared.space = createObject(OBJ_STRING,sdsnew(" ")); shared.colon = createObject(OBJ_STRING,sdsnew(":")); shared.plus = createObject(OBJ_STRING,sdsnew("+")); /* Shared command error responses */ shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew( "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")); shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n")); shared.nokeyerr = createObject(OBJ_STRING,sdsnew( "-ERR no such key\r\n")); shared.syntaxerr = createObject(OBJ_STRING,sdsnew( "-ERR syntax error\r\n")); shared.sameobjecterr = createObject(OBJ_STRING,sdsnew( "-ERR source and destination objects are the same\r\n")); shared.outofrangeerr = createObject(OBJ_STRING,sdsnew( "-ERR index out of range\r\n")); shared.noscripterr = createObject(OBJ_STRING,sdsnew( "-NOSCRIPT No matching script. Please use EVAL.\r\n")); shared.loadingerr = createObject(OBJ_STRING,sdsnew( "-LOADING Redis is loading the dataset in memory\r\n")); shared.slowscripterr = createObject(OBJ_STRING,sdsnew( "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n")); shared.masterdownerr = createObject(OBJ_STRING,sdsnew( "-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n")); shared.bgsaveerr = createObject(OBJ_STRING,sdsnew( "-MISCONF Redis is configured to save RDB snapshots, but it is currently not able to persist on disk. Commands that may modify the data set are disabled, because this instance is configured to report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). Please check the Redis logs for details about the RDB error.\r\n")); shared.roslaveerr = createObject(OBJ_STRING,sdsnew( "-READONLY You can't write against a read only replica.\r\n")); shared.noautherr = createObject(OBJ_STRING,sdsnew( "-NOAUTH Authentication required.\r\n")); shared.oomerr = createObject(OBJ_STRING,sdsnew( "-OOM command not allowed when used memory > 'maxmemory'.\r\n")); shared.execaborterr = createObject(OBJ_STRING,sdsnew( "-EXECABORT Transaction discarded because of previous errors.\r\n")); shared.noreplicaserr = createObject(OBJ_STRING,sdsnew( "-NOREPLICAS Not enough good replicas to write.\r\n")); shared.busykeyerr = createObject(OBJ_STRING,sdsnew( "-BUSYKEY Target key name already exists.\r\n")); /* The shared NULL depends on the protocol version. */ shared.null[0] = NULL; shared.null[1] = NULL; shared.null[2] = createObject(OBJ_STRING,sdsnew("$-1\r\n")); shared.null[3] = createObject(OBJ_STRING,sdsnew("_\r\n")); shared.nullarray[0] = NULL; shared.nullarray[1] = NULL; shared.nullarray[2] = createObject(OBJ_STRING,sdsnew("*-1\r\n")); shared.nullarray[3] = createObject(OBJ_STRING,sdsnew("_\r\n")); shared.emptymap[0] = NULL; shared.emptymap[1] = NULL; shared.emptymap[2] = createObject(OBJ_STRING,sdsnew("*0\r\n")); shared.emptymap[3] = createObject(OBJ_STRING,sdsnew("%0\r\n")); shared.emptyset[0] = NULL; shared.emptyset[1] = NULL; shared.emptyset[2] = createObject(OBJ_STRING,sdsnew("*0\r\n")); shared.emptyset[3] = createObject(OBJ_STRING,sdsnew("~0\r\n")); for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) { char dictid_str[64]; int dictid_len; dictid_len = ll2string(dictid_str,sizeof(dictid_str),j); shared.select[j] = createObject(OBJ_STRING, sdscatprintf(sdsempty(), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", dictid_len, dictid_str)); } shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13); shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14); shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15); shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); /* Shared command names */ shared.del = createStringObject("DEL",3); shared.unlink = createStringObject("UNLINK",6); shared.rpop = createStringObject("RPOP",4); shared.lpop = createStringObject("LPOP",4); shared.lpush = createStringObject("LPUSH",5); shared.rpoplpush = createStringObject("RPOPLPUSH",9); shared.lmove = createStringObject("LMOVE",5); shared.blmove = createStringObject("BLMOVE",6); shared.zpopmin = createStringObject("ZPOPMIN",7); shared.zpopmax = createStringObject("ZPOPMAX",7); shared.multi = createStringObject("MULTI",5); shared.exec = createStringObject("EXEC",4); shared.hset = createStringObject("HSET",4); shared.srem = createStringObject("SREM",4); shared.xgroup = createStringObject("XGROUP",6); shared.xclaim = createStringObject("XCLAIM",6); shared.script = createStringObject("SCRIPT",6); shared.replconf = createStringObject("REPLCONF",8); shared.pexpireat = createStringObject("PEXPIREAT",9); shared.pexpire = createStringObject("PEXPIRE",7); shared.persist = createStringObject("PERSIST",7); shared.set = createStringObject("SET",3); shared.eval = createStringObject("EVAL",4); /* Shared command argument */ shared.left = createStringObject("left",4); shared.right = createStringObject("right",5); shared.pxat = createStringObject("PXAT", 4); shared.px = createStringObject("PX",2); shared.time = createStringObject("TIME",4); shared.retrycount = createStringObject("RETRYCOUNT",10); shared.force = createStringObject("FORCE",5); shared.justid = createStringObject("JUSTID",6); shared.lastid = createStringObject("LASTID",6); shared.default_username = createStringObject("default",7); shared.ping = createStringObject("ping",4); shared.setid = createStringObject("SETID",5); shared.keepttl = createStringObject("KEEPTTL",7); shared.load = createStringObject("LOAD",4); shared.createconsumer = createStringObject("CREATECONSUMER",14); shared.getack = createStringObject("GETACK",6); shared.special_asterick = createStringObject("*",1); shared.special_equals = createStringObject("=",1); shared.redacted = makeObjectShared(createStringObject("(redacted)",10)); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); shared.integers[j]->encoding = OBJ_ENCODING_INT; } for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) { shared.mbulkhdr[j] = createObject(OBJ_STRING, sdscatprintf(sdsempty(),"*%d\r\n",j)); shared.bulkhdr[j] = createObject(OBJ_STRING, sdscatprintf(sdsempty(),"$%d\r\n",j)); } /* The following two shared objects, minstring and maxstrings, are not * actually used for their value but as a special object meaning * respectively the minimum possible string and the maximum possible * string in string comparisons for the ZRANGEBYLEX command. */ shared.minstring = sdsnew("minstring"); shared.maxstring = sdsnew("maxstring");}
命令對象
redis 命令是使用的是 redisCommand 數(shù)據(jù)結構來管理的。
typedef void redisCommandProc(client *c);// 函數(shù)指針類型,指向命令實現(xiàn)函數(shù)typedef int redisGetKeysProc(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);struct redisCommand { char *name; redisCommandProc *proc; // 限制命令的個數(shù)。-N 表示至少 N 個參數(shù),包含命令本身 int arity; // 字符串方式設置命令的屬性之間運用 | 運算,程序內(nèi)部自動解析,函數(shù) populateCommandTable char *sflags; /* Flags as string representation, one char per flag. */ // 將 flags 字符串類型轉(zhuǎn)換成整數(shù),多個屬性 uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */ /* Use a function to determine keys arguments in a command line. * Used for Redis Cluster redirect. */ redisGetKeysProc *getkeys_proc; /* What keys should be loaded in background when calling this command? */ int firstkey; /* The first argument that's a key (0 = no keys) */ int lastkey; /* The last argument that's a key */ int keystep; /* The step between first and last key */ long long microseconds, calls, rejected_calls, failed_calls; int id; /* Command ID. This is a progressive ID starting from 0 that is assigned at runtime, and is used in order to check ACLs. A connection is able to execute a given command if the user associated to the connection has this command bit set in the bitmap of allowed commands. */};
針對 sflag 標示,這里可以看看 《redis 設計與實現(xiàn)》
flag 記錄的是 flag 值與 sflag 進行運算的結果,見 populateCommandTable
函數(shù)
for (int j = 0; j < argc; j++) { char *flag = argv[j]; if (!strcasecmp(flag,"write")) { c->flags |= CMD_WRITE|CMD_CATEGORY_WRITE; } else if (!strcasecmp(flag,"read-only")) { c->flags |= CMD_READONLY|CMD_CATEGORY_READ; } else if (!strcasecmp(flag,"use-memory")) { c->flags |= CMD_DENYOOM; } else if (!strcasecmp(flag,"admin")) { c->flags |= CMD_ADMIN|CMD_CATEGORY_ADMIN|CMD_CATEGORY_DANGEROUS; } else if (!strcasecmp(flag,"pub-sub")) { c->flags |= CMD_PUBSUB|CMD_CATEGORY_PUBSUB; } else if (!strcasecmp(flag,"no-script")) { c->flags |= CMD_NOSCRIPT; } else if (!strcasecmp(flag,"random")) { c->flags |= CMD_RANDOM; } else if (!strcasecmp(flag,"to-sort")) { c->flags |= CMD_SORT_FOR_SCRIPT; } else if (!strcasecmp(flag,"ok-loading")) { c->flags |= CMD_LOADING; } else if (!strcasecmp(flag,"ok-stale")) { c->flags |= CMD_STALE; } else if (!strcasecmp(flag,"no-monitor")) { c->flags |= CMD_SKIP_MONITOR; } else if (!strcasecmp(flag,"no-slowlog")) { c->flags |= CMD_SKIP_SLOWLOG; } else if (!strcasecmp(flag,"cluster-asking")) { c->flags |= CMD_ASKING; } else if (!strcasecmp(flag,"fast")) { c->flags |= CMD_FAST | CMD_CATEGORY_FAST; } else if (!strcasecmp(flag,"no-auth")) { c->flags |= CMD_NO_AUTH; } else if (!strcasecmp(flag,"may-replicate")) { c->flags |= CMD_MAY_REPLICATE; } else { /* Parse ACL categories here if the flag name starts with @. */ uint64_t catflag; if (flag[0] == '@' && (catflag = ACLGetCommandCategoryFlagByName(flag+1)) != 0) { c->flags |= catflag; } else { sdsfreesplitres(argv,argc); return C_ERR; } } }
具體命令比較多
struct redisCommand redisCommandTable[] = { {"module",moduleCommand,-2, "admin no-script", 0,NULL,0,0,0,0,0,0}, {"get",getCommand,2, "read-only fast @string", 0,NULL,1,1,1,0,0,0},}
以 set 為例子 {“set”,setCommand,-3, “write use-memory @string”, 0,NULL,1,1,1,0,0,0}
參考資料
- 《Redis 設計與實現(xiàn)》黃健宏
作者:老鄭 運維開發(fā)故事
歡迎關注:運維開發(fā)故事