Cache Management System: Carbon prunes and caches all block/blocklet index information into the driver for normal table to increase the query performance by reducing the number of files which are read. This caching mechanism causes the driver to become a bottleneck in the following ways:
- If the cache size becomes huge(70–80% of the driver memory) then there can be excessive GC in the driver which can slow down the query and the driver may even go OutOfMemory.
- There will be a hard limit to how much LRU can be saved. In case when the cache is full and it needs to accommodate new objects it has to evict elements from the cache which would in turn slow down the queries.
- Multiple JDBC drivers need to maintain their own copy of the cache.
The idea behind a Distributed Index Cache Server is to solve the above mentioned problems using just one solution.
Index Server: When enabled, any query on a carbon table will be routed to the index server service in form of a request. The request will consist of the table name, segments, filter expression and other information used for pruning.
In IndexServer service a pruning RDD is fired which will take care of the pruning for that request. This RDD will be creating tasks based on the number of segments that are applicable for pruning. It can happen that the user has specified segments to access for that table, so only the specified segments would be applicable for pruning. Refer: query-data-with-specified-segments. IndexServer driver would have 2 important tasks, distributing the segments equally among the available executors and keeping track of the executor where the segment is cached.
To achieve this 2 separate mappings would be maintained as follows.
- segment to executor location: This mapping will be maintained for each table and will enable the index server to track the cache location for each segment.
- Cache size held by each executor: This mapping will be used to distribute the segments equally(on the basis of size) among the executors.
Once a request is received each segment would be iterated over and checked against tableToExecutorMapping to find if an executor is already assigned. If a mapping already exists then it means that most probably(if not evicted by LRU) the segment is already cached in that executor and the task for that segment has to be fired on this executor.
If mapping is not found then first check executorToCacheMapping against the available executor list to find if any unassigned executor is present and use that executor for the current segment. If all the executors are assigned with some segment then find the least loaded executor on the basis of size.
Initially the segment index size would be used to distribute the segments fairly among the executor because the actual cache size would be known to the driver only when the segments are cached and appropriate information is returned to the driver.
The JDBC server can connect to Index Server by using various number of APIs each designed for specific purpose depending on the query that has been fired by the user. The APIs are as follows:
- getSplits(): Will be used in case of full scan query or a query with a filter
- getCount(): Will be used in case of count(*) query and will directly return the total count.
- showCache(): Used to see the size of cache in the index server. Used in show cache command.
- invalidateCache() : Used to remove the cache of segments from the Index Server. Used during drop table command.
After the job is completed the tasks would return the cache size held by each executor which would be updated to the executorToCacheMapping and the pruned blocklets which would be further used for result fetching. If the response is too huge then it is better to write the splits to a file so that the driver can read this file and create the splits to avoid serialisation/deserialisation.
Using this solution would allow multiple JDBC drivers to connect to the index server to use the cache.
Reallocation for Dead Executor(s)
In case executor(s) become dead/unavailable then the segments that were earlier being handled by those would be reassigned to some other executor using the distribution logic and the cache loading would be done again in the new executor
In case of any failure the index server would fallback to embedded mode which means that the JDBCServer would take care of distributed pruning. A similar job would be fired by the JDBCServer which would take care of pruning using its own executors. If for any reason the embedded mode also fails to prune the indexes then the job would be passed on to driver.
We tested the performance of Index Server on tables of various sizes. The performance statistics are as follows:
There is an improvement of 10–20 percent for all the queries and a huge improvement(3–4 times) in case of count(*) queries.