Skip to content

Commit fd31df0

Browse files
committed
updated batch aggregator, fixed extractcommandvalue
1 parent d9164d3 commit fd31df0

File tree

3 files changed

+386
-48
lines changed

3 files changed

+386
-48
lines changed

command.go

Lines changed: 251 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,33 @@ const (
153153
)
154154
>>>>>>> b6633bf9 (centralize cluster command routing in osscluster_router.go and refactor osscluster.go (#6))
155155

156+
type (
157+
CmdTypeXAutoClaimValue struct {
158+
messages []XMessage
159+
start string
160+
}
161+
162+
CmdTypeXAutoClaimJustIDValue struct {
163+
ids []string
164+
start string
165+
}
166+
167+
CmdTypeScanValue struct {
168+
keys []string
169+
cursor uint64
170+
}
171+
172+
CmdTypeKeyValuesValue struct {
173+
key string
174+
values []string
175+
}
176+
177+
CmdTypeZSliceWithKeyValue struct {
178+
key string
179+
zSlice []Z
180+
}
181+
)
182+
156183
type Cmder interface {
157184
// command name.
158185
// e.g. "set k v ex 10" -> "set", "cluster info" -> "cluster".
@@ -6981,6 +7008,10 @@ func ExtractCommandValue(cmd interface{}) interface{} {
69817008

69827009
// Use fast type-based extraction
69837010
switch cmdType {
7011+
case CmdTypeGeneric:
7012+
if genericCmd, ok := cmd.(interface{ Val() interface{} }); ok {
7013+
return genericCmd.Val()
7014+
}
69847015
case CmdTypeString:
69857016
if stringCmd, ok := cmd.(interface{ Val() string }); ok {
69867017
return stringCmd.Val()
@@ -7001,21 +7032,217 @@ func ExtractCommandValue(cmd interface{}) interface{} {
70017032
if statusCmd, ok := cmd.(interface{ Val() string }); ok {
70027033
return statusCmd.Val()
70037034
}
7004-
case CmdTypeDuration, CmdTypeTime, CmdTypeStringStructMap, CmdTypeXMessageSlice,
7005-
CmdTypeXStreamSlice, CmdTypeXPending, CmdTypeXPendingExt, CmdTypeXAutoClaim,
7006-
CmdTypeXAutoClaimJustID, CmdTypeXInfoConsumers, CmdTypeXInfoGroups, CmdTypeXInfoStream,
7007-
CmdTypeXInfoStreamFull, CmdTypeZSlice, CmdTypeZWithKey, CmdTypeScan, CmdTypeClusterSlots,
7008-
CmdTypeGeoSearchLocation, CmdTypeGeoPos, CmdTypeCommandsInfo, CmdTypeSlowLog,
7009-
CmdTypeKeyValues, CmdTypeZSliceWithKey, CmdTypeFunctionList, CmdTypeFunctionStats,
7010-
CmdTypeLCS, CmdTypeKeyFlags, CmdTypeClusterLinks, CmdTypeClusterShards,
7011-
CmdTypeRankWithScore, CmdTypeClientInfo, CmdTypeACLLog, CmdTypeInfo, CmdTypeMonitor,
7012-
CmdTypeJSON, CmdTypeJSONSlice, CmdTypeIntPointerSlice, CmdTypeScanDump, CmdTypeBFInfo,
7013-
CmdTypeCFInfo, CmdTypeCMSInfo, CmdTypeTopKInfo, CmdTypeTDigestInfo, CmdTypeFTSearch,
7014-
CmdTypeFTInfo, CmdTypeFTSpellCheck, CmdTypeFTSynDump, CmdTypeAggregate,
7015-
CmdTypeTSTimestampValue, CmdTypeTSTimestampValueSlice:
7016-
if durationCmd, ok := cmd.(interface{ Val() interface{} }); ok {
7035+
case CmdTypeDuration:
7036+
if durationCmd, ok := cmd.(interface{ Val() time.Duration }); ok {
70177037
return durationCmd.Val()
70187038
}
7039+
case CmdTypeTime:
7040+
if timeCmd, ok := cmd.(interface{ Val() time.Time }); ok {
7041+
return timeCmd.Val()
7042+
}
7043+
case CmdTypeStringStructMap:
7044+
if structMapCmd, ok := cmd.(interface{ Val() map[string]struct{} }); ok {
7045+
return structMapCmd.Val()
7046+
}
7047+
case CmdTypeXMessageSlice:
7048+
if xMessageSliceCmd, ok := cmd.(interface{ Val() []XMessage }); ok {
7049+
return xMessageSliceCmd.Val()
7050+
}
7051+
case CmdTypeXStreamSlice:
7052+
if xStreamSliceCmd, ok := cmd.(interface{ Val() []XStream }); ok {
7053+
return xStreamSliceCmd.Val()
7054+
}
7055+
case CmdTypeXPending:
7056+
if xPendingCmd, ok := cmd.(interface{ Val() *XPending }); ok {
7057+
return xPendingCmd.Val()
7058+
}
7059+
case CmdTypeXPendingExt:
7060+
if xPendingExtCmd, ok := cmd.(interface{ Val() []XPendingExt }); ok {
7061+
return xPendingExtCmd.Val()
7062+
}
7063+
case CmdTypeXAutoClaim:
7064+
if xAutoClaimCmd, ok := cmd.(interface{ Val() ([]XMessage, string) }); ok {
7065+
messages, start := xAutoClaimCmd.Val()
7066+
return CmdTypeXAutoClaimValue{messages: messages, start: start}
7067+
}
7068+
case CmdTypeXAutoClaimJustID:
7069+
if xAutoClaimJustIDCmd, ok := cmd.(interface{ Val() ([]string, string) }); ok {
7070+
ids, start := xAutoClaimJustIDCmd.Val()
7071+
return CmdTypeXAutoClaimJustIDValue{ids: ids, start: start}
7072+
}
7073+
case CmdTypeXInfoConsumers:
7074+
if xInfoConsumersCmd, ok := cmd.(interface{ Val() []XInfoConsumer }); ok {
7075+
return xInfoConsumersCmd.Val()
7076+
}
7077+
case CmdTypeXInfoGroups:
7078+
if xInfoGroupsCmd, ok := cmd.(interface{ Val() []XInfoGroup }); ok {
7079+
return xInfoGroupsCmd.Val()
7080+
}
7081+
case CmdTypeXInfoStream:
7082+
if xInfoStreamCmd, ok := cmd.(interface{ Val() *XInfoStream }); ok {
7083+
return xInfoStreamCmd.Val()
7084+
}
7085+
case CmdTypeXInfoStreamFull:
7086+
if xInfoStreamFullCmd, ok := cmd.(interface{ Val() *XInfoStreamFull }); ok {
7087+
return xInfoStreamFullCmd.Val()
7088+
}
7089+
case CmdTypeZSlice:
7090+
if zSliceCmd, ok := cmd.(interface{ Val() []Z }); ok {
7091+
return zSliceCmd.Val()
7092+
}
7093+
case CmdTypeZWithKey:
7094+
if zWithKeyCmd, ok := cmd.(interface{ Val() *ZWithKey }); ok {
7095+
return zWithKeyCmd.Val()
7096+
}
7097+
case CmdTypeScan:
7098+
if scanCmd, ok := cmd.(interface{ Val() ([]string, uint64) }); ok {
7099+
keys, cursor := scanCmd.Val()
7100+
return CmdTypeScanValue{keys: keys, cursor: cursor}
7101+
}
7102+
case CmdTypeClusterSlots:
7103+
if clusterSlotsCmd, ok := cmd.(interface{ Val() []ClusterSlot }); ok {
7104+
return clusterSlotsCmd.Val()
7105+
}
7106+
case CmdTypeGeoLocation:
7107+
if geoLocationCmd, ok := cmd.(interface{ Val() []GeoLocation }); ok {
7108+
return geoLocationCmd.Val()
7109+
}
7110+
case CmdTypeGeoSearchLocation:
7111+
if geoSearchLocationCmd, ok := cmd.(interface{ Val() []GeoLocation }); ok {
7112+
return geoSearchLocationCmd.Val()
7113+
}
7114+
case CmdTypeGeoPos:
7115+
if geoPosCmd, ok := cmd.(interface{ Val() []*GeoPos }); ok {
7116+
return geoPosCmd.Val()
7117+
}
7118+
case CmdTypeCommandsInfo:
7119+
if commandsInfoCmd, ok := cmd.(interface {
7120+
Val() map[string]*CommandInfo
7121+
}); ok {
7122+
return commandsInfoCmd.Val()
7123+
}
7124+
case CmdTypeSlowLog:
7125+
if slowLogCmd, ok := cmd.(interface{ Val() []SlowLog }); ok {
7126+
return slowLogCmd.Val()
7127+
}
7128+
case CmdTypeKeyValues:
7129+
if keyValuesCmd, ok := cmd.(interface{ Val() (string, []string) }); ok {
7130+
key, values := keyValuesCmd.Val()
7131+
return CmdTypeKeyValuesValue{key: key, values: values}
7132+
}
7133+
case CmdTypeZSliceWithKey:
7134+
if zSliceWithKeyCmd, ok := cmd.(interface{ Val() (string, []Z) }); ok {
7135+
key, zSlice := zSliceWithKeyCmd.Val()
7136+
return CmdTypeZSliceWithKeyValue{key: key, zSlice: zSlice}
7137+
}
7138+
case CmdTypeFunctionList:
7139+
if functionListCmd, ok := cmd.(interface{ Val() []Library }); ok {
7140+
return functionListCmd.Val()
7141+
}
7142+
case CmdTypeFunctionStats:
7143+
if functionStatsCmd, ok := cmd.(interface{ Val() FunctionStats }); ok {
7144+
return functionStatsCmd.Val()
7145+
}
7146+
case CmdTypeLCS:
7147+
if lcsCmd, ok := cmd.(interface{ Val() *LCSMatch }); ok {
7148+
return lcsCmd.Val()
7149+
}
7150+
case CmdTypeKeyFlags:
7151+
if keyFlagsCmd, ok := cmd.(interface{ Val() []KeyFlags }); ok {
7152+
return keyFlagsCmd.Val()
7153+
}
7154+
case CmdTypeClusterLinks:
7155+
if clusterLinksCmd, ok := cmd.(interface{ Val() []ClusterLink }); ok {
7156+
return clusterLinksCmd.Val()
7157+
}
7158+
case CmdTypeClusterShards:
7159+
if clusterShardsCmd, ok := cmd.(interface{ Val() []ClusterShard }); ok {
7160+
return clusterShardsCmd.Val()
7161+
}
7162+
case CmdTypeRankWithScore:
7163+
if rankWithScoreCmd, ok := cmd.(interface{ Val() RankScore }); ok {
7164+
return rankWithScoreCmd.Val()
7165+
}
7166+
case CmdTypeClientInfo:
7167+
if clientInfoCmd, ok := cmd.(interface{ Val() *ClientInfo }); ok {
7168+
return clientInfoCmd.Val()
7169+
}
7170+
case CmdTypeACLLog:
7171+
if aclLogCmd, ok := cmd.(interface{ Val() []*ACLLogEntry }); ok {
7172+
return aclLogCmd.Val()
7173+
}
7174+
case CmdTypeInfo:
7175+
if infoCmd, ok := cmd.(interface{ Val() string }); ok {
7176+
return infoCmd.Val()
7177+
}
7178+
case CmdTypeMonitor:
7179+
if monitorCmd, ok := cmd.(interface{ Val() string }); ok {
7180+
return monitorCmd.Val()
7181+
}
7182+
case CmdTypeJSON:
7183+
if jsonCmd, ok := cmd.(interface{ Val() string }); ok {
7184+
return jsonCmd.Val()
7185+
}
7186+
case CmdTypeJSONSlice:
7187+
if jsonSliceCmd, ok := cmd.(interface{ Val() []interface{} }); ok {
7188+
return jsonSliceCmd.Val()
7189+
}
7190+
case CmdTypeIntPointerSlice:
7191+
if intPointerSliceCmd, ok := cmd.(interface{ Val() []*int64 }); ok {
7192+
return intPointerSliceCmd.Val()
7193+
}
7194+
case CmdTypeScanDump:
7195+
if scanDumpCmd, ok := cmd.(interface{ Val() ScanDump }); ok {
7196+
return scanDumpCmd.Val()
7197+
}
7198+
case CmdTypeBFInfo:
7199+
if bfInfoCmd, ok := cmd.(interface{ Val() BFInfo }); ok {
7200+
return bfInfoCmd.Val()
7201+
}
7202+
case CmdTypeCFInfo:
7203+
if cfInfoCmd, ok := cmd.(interface{ Val() CFInfo }); ok {
7204+
return cfInfoCmd.Val()
7205+
}
7206+
case CmdTypeCMSInfo:
7207+
if cmsInfoCmd, ok := cmd.(interface{ Val() CMSInfo }); ok {
7208+
return cmsInfoCmd.Val()
7209+
}
7210+
case CmdTypeTopKInfo:
7211+
if topKInfoCmd, ok := cmd.(interface{ Val() TopKInfo }); ok {
7212+
return topKInfoCmd.Val()
7213+
}
7214+
case CmdTypeTDigestInfo:
7215+
if tDigestInfoCmd, ok := cmd.(interface{ Val() TDigestInfo }); ok {
7216+
return tDigestInfoCmd.Val()
7217+
}
7218+
case CmdTypeFTSearch:
7219+
if ftSearchCmd, ok := cmd.(interface{ Val() FTSearchResult }); ok {
7220+
return ftSearchCmd.Val()
7221+
}
7222+
case CmdTypeFTInfo:
7223+
if ftInfoCmd, ok := cmd.(interface{ Val() FTInfoResult }); ok {
7224+
return ftInfoCmd.Val()
7225+
}
7226+
case CmdTypeFTSpellCheck:
7227+
if ftSpellCheckCmd, ok := cmd.(interface{ Val() []SpellCheckResult }); ok {
7228+
return ftSpellCheckCmd.Val()
7229+
}
7230+
case CmdTypeFTSynDump:
7231+
if ftSynDumpCmd, ok := cmd.(interface{ Val() []FTSynDumpResult }); ok {
7232+
return ftSynDumpCmd.Val()
7233+
}
7234+
case CmdTypeAggregate:
7235+
if aggregateCmd, ok := cmd.(interface{ Val() *FTAggregateResult }); ok {
7236+
return aggregateCmd.Val()
7237+
}
7238+
case CmdTypeTSTimestampValue:
7239+
if tsTimestampValueCmd, ok := cmd.(interface{ Val() TSTimestampValue }); ok {
7240+
return tsTimestampValueCmd.Val()
7241+
}
7242+
case CmdTypeTSTimestampValueSlice:
7243+
if tsTimestampValueSliceCmd, ok := cmd.(interface{ Val() []TSTimestampValue }); ok {
7244+
return tsTimestampValueSliceCmd.Val()
7245+
}
70197246
case CmdTypeStringSlice:
70207247
if stringSliceCmd, ok := cmd.(interface{ Val() []string }); ok {
70217248
return stringSliceCmd.Val()
@@ -7032,6 +7259,14 @@ func ExtractCommandValue(cmd interface{}) interface{} {
70327259
if floatSliceCmd, ok := cmd.(interface{ Val() []float64 }); ok {
70337260
return floatSliceCmd.Val()
70347261
}
7262+
case CmdTypeSlice:
7263+
if sliceCmd, ok := cmd.(interface{ Val() []interface{} }); ok {
7264+
return sliceCmd.Val()
7265+
}
7266+
case CmdTypeKeyValueSlice:
7267+
if keyValueSliceCmd, ok := cmd.(interface{ Val() []KeyValue }); ok {
7268+
return keyValueSliceCmd.Val()
7269+
}
70357270
case CmdTypeMapStringString:
70367271
if mapCmd, ok := cmd.(interface{ Val() map[string]string }); ok {
70377272
return mapCmd.Val()
@@ -7042,7 +7277,7 @@ func ExtractCommandValue(cmd interface{}) interface{} {
70427277
}
70437278
case CmdTypeMapStringInterfaceSlice:
70447279
if mapCmd, ok := cmd.(interface {
7045-
Val() map[string][]interface{}
7280+
Val() []map[string]interface{}
70467281
}); ok {
70477282
return mapCmd.Val()
70487283
}
@@ -7051,12 +7286,12 @@ func ExtractCommandValue(cmd interface{}) interface{} {
70517286
return mapCmd.Val()
70527287
}
70537288
case CmdTypeMapStringStringSlice:
7054-
if mapCmd, ok := cmd.(interface{ Val() map[string][]string }); ok {
7289+
if mapCmd, ok := cmd.(interface{ Val() []map[string]string }); ok {
70557290
return mapCmd.Val()
70567291
}
70577292
case CmdTypeMapMapStringInterface:
70587293
if mapCmd, ok := cmd.(interface {
7059-
Val() map[string][]interface{}
7294+
Val() map[string]interface{}
70607295
}); ok {
70617296
return mapCmd.Val()
70627297
}

0 commit comments

Comments
 (0)