Skip to content

Commit a497ce2

Browse files
pauldixjvshahid
authored andcommitted
Add Shard Spaces and Retention Policies
Fixes #571. Removed the old long term and short term shards. Now if the user doesn't manually create a shard space, a catchall called "default" will be created the first time they write data in. Added API endpoints to list, create, and delete shard spaces. Made a breaking change to the old get shards API by changing the format since shortTerm and longTerm shards aren't guaranteed to be present. Shard spaces can be mapped to a single database. Retention policies are enforced per server by a background routine that periodically checks shards in spaces that have retention policies to see if they have passed their expiration.
1 parent a6750fd commit a497ce2

22 files changed

Lines changed: 812 additions & 790 deletions

config.sample.toml

Lines changed: 3 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ point-batch-size = 100
9494
# reduce the memory usage, but will result in slower writes.
9595
write-batch-size = 5000000
9696

97+
# The server will check this often for shards that have expired that should be cleared.
98+
retention-sweep-period = "10m"
99+
97100
[storage.engines.leveldb]
98101

99102
# Maximum mmap open files, this will affect the virtual memory used by
@@ -169,47 +172,6 @@ max-response-buffer-size = 100
169172
# that you don't need to buffer in memory, but you won't get the best performance.
170173
concurrent-shard-query-limit = 10
171174

172-
# These options specify how data is sharded across the cluster. There are two
173-
# shard configurations that have the same knobs: short term and long term.
174-
# Any series that begins with a capital letter like Exceptions will be written
175-
# into the long term storage. Any series beginning with a lower case letter
176-
# like exceptions will be written into short term. The idea being that you
177-
# can write high precision data into short term and drop it after a couple
178-
# of days. Meanwhile, continuous queries can run downsampling on the short term
179-
# data and write into the long term area.
180-
[sharding]
181-
# how many servers in the cluster should have a copy of each shard.
182-
# this will give you high availability and scalability on queries
183-
replication-factor = 1
184-
185-
[sharding.short-term]
186-
# each shard will have this period of time. Note that it's best to have
187-
# group by time() intervals on all queries be < than this setting. If they are
188-
# then the aggregate is calculated locally. Otherwise, all that data gets sent
189-
# over the network when doing a query.
190-
duration = "7d"
191-
192-
# split will determine how many shards to split each duration into. For example,
193-
# if we created a shard for 2014-02-10 and split was set to 2. Then two shards
194-
# would be created that have the data for 2014-02-10. By default, data will
195-
# be split into those two shards deterministically by hashing the (database, serise)
196-
# tuple. That means that data for a given series will be written to a single shard
197-
# making querying efficient. That can be overridden with the next option.
198-
split = 1
199-
200-
# You can override the split behavior to have the data for series that match a
201-
# given regex be randomly distributed across the shards for a given interval.
202-
# You can use this if you have a hot spot for a given time series writing more
203-
# data than a single server can handle. Most people won't have to resort to this
204-
# option. Also note that using this option means that queries will have to send
205-
# all data over the network so they won't be as efficient.
206-
# split-random = "/^hf.*/"
207-
208-
[sharding.long-term]
209-
duration = "30d"
210-
split = 1
211-
# split-random = "/^Hf.*/"
212-
213175
[wal]
214176

215177
dir = "/tmp/influxdb/development/wal"

src/api/http/api.go

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ func (self *HttpServer) Serve(listener net.Listener) {
151151
self.registerEndpoint(p, "post", "/cluster/shards", self.createShard)
152152
self.registerEndpoint(p, "get", "/cluster/shards", self.getShards)
153153
self.registerEndpoint(p, "del", "/cluster/shards/:id", self.dropShard)
154+
self.registerEndpoint(p, "get", "/cluster/shard_spaces", self.getShardSpaces)
155+
self.registerEndpoint(p, "post", "/cluster/shard_spaces", self.createShardSpace)
156+
self.registerEndpoint(p, "del", "/cluster/shard_spaces/:db/:name", self.dropShardSpace)
154157

155158
// return whether the cluster is in sync or not
156159
self.registerEndpoint(p, "get", "/sync", self.isInSync)
@@ -1018,7 +1021,8 @@ type newShardInfo struct {
10181021
StartTime int64 `json:"startTime"`
10191022
EndTime int64 `json:"endTime"`
10201023
Shards []newShardServerIds `json:"shards"`
1021-
LongTerm bool `json:"longTerm"`
1024+
SpaceName string `json:"spaceName"`
1025+
Database string `json:"database"`
10221026
}
10231027

10241028
type newShardServerIds struct {
@@ -1038,16 +1042,13 @@ func (self *HttpServer) createShard(w libhttp.ResponseWriter, r *libhttp.Request
10381042
}
10391043
shards := make([]*cluster.NewShardData, 0)
10401044

1041-
shardType := cluster.SHORT_TERM
1042-
if newShards.LongTerm {
1043-
shardType = cluster.LONG_TERM
1044-
}
10451045
for _, s := range newShards.Shards {
10461046
newShardData := &cluster.NewShardData{
10471047
StartTime: time.Unix(newShards.StartTime, 0),
10481048
EndTime: time.Unix(newShards.EndTime, 0),
10491049
ServerIds: s.ServerIds,
1050-
Type: shardType,
1050+
SpaceName: newShards.SpaceName,
1051+
Database: newShards.Database,
10511052
}
10521053
shards = append(shards, newShardData)
10531054
}
@@ -1061,10 +1062,18 @@ func (self *HttpServer) createShard(w libhttp.ResponseWriter, r *libhttp.Request
10611062

10621063
func (self *HttpServer) getShards(w libhttp.ResponseWriter, r *libhttp.Request) {
10631064
self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) {
1064-
result := make(map[string]interface{})
1065-
result["shortTerm"] = self.convertShardsToMap(self.clusterConfig.GetShortTermShards())
1066-
result["longTerm"] = self.convertShardsToMap(self.clusterConfig.GetLongTermShards())
1067-
return libhttp.StatusOK, result
1065+
shards := self.clusterConfig.GetShards()
1066+
shardMaps := make([]map[string]interface{}, 0, len(shards))
1067+
for _, s := range shards {
1068+
shardMaps = append(shardMaps, map[string]interface{}{
1069+
"id": s.Id(),
1070+
"endTime": s.EndTime().Unix(),
1071+
"startTime": s.StartTime().Unix(),
1072+
"serverIds": s.ServerIds(),
1073+
"spaceName": s.SpaceName,
1074+
"database": s.Database})
1075+
}
1076+
return libhttp.StatusOK, shardMaps
10681077
})
10691078
}
10701079

@@ -1123,3 +1132,43 @@ func (self *HttpServer) convertShardsToMap(shards []*cluster.ShardData) []interf
11231132
}
11241133
return result
11251134
}
1135+
1136+
func (self *HttpServer) getShardSpaces(w libhttp.ResponseWriter, r *libhttp.Request) {
1137+
self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) {
1138+
return libhttp.StatusOK, self.clusterConfig.GetShardSpaces()
1139+
})
1140+
}
1141+
1142+
func (self *HttpServer) createShardSpace(w libhttp.ResponseWriter, r *libhttp.Request) {
1143+
self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) {
1144+
space := &cluster.ShardSpace{}
1145+
body, err := ioutil.ReadAll(r.Body)
1146+
if err != nil {
1147+
return libhttp.StatusInternalServerError, err.Error()
1148+
}
1149+
err = json.Unmarshal(body, space)
1150+
if err != nil {
1151+
return libhttp.StatusInternalServerError, err.Error()
1152+
}
1153+
err = space.Validate(self.clusterConfig)
1154+
if err != nil {
1155+
return libhttp.StatusBadRequest, err.Error()
1156+
}
1157+
err = self.raftServer.CreateShardSpace(space)
1158+
if err != nil {
1159+
return libhttp.StatusInternalServerError, err.Error()
1160+
}
1161+
return libhttp.StatusOK, nil
1162+
})
1163+
}
1164+
1165+
func (self *HttpServer) dropShardSpace(w libhttp.ResponseWriter, r *libhttp.Request) {
1166+
self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) {
1167+
name := r.URL.Query().Get(":name")
1168+
db := r.URL.Query().Get(":db")
1169+
if err := self.raftServer.DropShardSpace(db, name); err != nil {
1170+
return libhttp.StatusInternalServerError, err.Error()
1171+
}
1172+
return libhttp.StatusOK, nil
1173+
})
1174+
}

0 commit comments

Comments
 (0)