Notice: This Wiki is now read only and edits are no longer possible. Please see: https://gitlab.eclipse.org/eclipsefdn/helpdesk/-/wikis/Wiki-shutdown-plan for the plan.
EclipseLink/DesignDocs/328937
Design Specification: Data Partitioning
Document History
Date | Author | Version Description & Notes |
---|---|---|
2010-10-28 | James | 0.1 Draft |
2010-11-29 | James | 0.2 Update |
Project overview
The ability to provide access to a databases cluster to improve scalability and fault tolerance.
Concepts
Horizontal Partitioning - Splitting the data in a table/database into separate logical partitions and storing that data in a separate database instance.
Vertical Partitioning - Splitting the data in a database by table into separate database instances.
HA - High availability, to be fault tolerant to a single point of failure, be able to continue to function if one server goes down.
Oracle RAC - clustered database where each node replicates the entire data of the database.
UCP - Universal Connection Pool is part of Oracle JDBC and provides extended support for Oracle RAC
Requirements
Phase 1:
- Support data partitioning using range partitions.
- Support round robin load balancing.
- Support replication.
- Support data partitioning using hash partitions.
- Support data partitioning using value partitions.
- Support pined/vertical partitioning.
Phase 2:
- Support fail-over.
- Support Oracle UCP and WebLogic RAC aware DataSources.
- Support Oracle RAC using a separate data source per node.
Design Constraints
Must support non-cluster aware databases and other databases and application servers.
Support Oracle RAC.
Support WebLogic.
Functionality
A PartitioningPolicy will be added to ClassDescriptor, AbstractSession, and DatabaseQuery. The policy will be used where a connection is required. For a query for a class, the descriptor policy will be used first, then the session's policy, then the default behavior. If the query has its own policy it will be used first.
A partition policy will also be settable on a mapping, which will set it on each of the mapping's queries.
Before execution of a Call the server session will check for a partition policy and ask if for a set of Accessors. The call will be executed on each accessor (depending on the type of call). Modify calls will execute on all accessors and return 0 row count if any are 0. Single result calls will return the first result. Multiple result calls will union the list of results from all connections.
A DatabaseQuery will have a List of Accessors instead of a single Accessor. A ClientSession will have a Map of Accessors (keyed on pool name) instead of a single writeAccessor. A ClientSession will support lazily adding accessors to the current transaction. On commit the ClientSession will commit all registered accessors.
The user will be able to subclass the PartitionPolicy to provide there own partitioning mechanism. The callbacks will take the AbstractSession, DatabaseQuery, and AbstractRecord of query arguments. The callback will return a List of Accessors to use for the query. This API gives the flexibility for the policy to use watchever connections it desired for the current context. The same API will be defined on AbstractSession and the current getAccessor(Class)/getAccessor(String) APIs used by SessionBroker will be re factored to use this unified API which allows the SessionBroker to check the query for the class and sessionName.
The API is:
public List<Accessor> getConnectionsForQuery(AbstractSession session, DatabaseQuery query, AbstractRecord arguments)
A RoundRobinPolicy will cycle through a list of connection pools to distribute the load evenly. It will have an option to only load balance read queries, and to replicate write queries.
A RangePartitionPolicy will map a query parameter name to a node based on its value and a set of ranges. If the query does not define the parameter the policy will either use the session default behavior or have an option to will send the query to all pools and union the result.
A ValuePartitionPolicy will behave the same as the range policy, but map a value to a pool instead of a range. It will also define a default pool to use for any un mapped values.
A HashPartitionPolicy will hash the parameter value into a list of connection pools.
A ReplicationPolicy will send write queries to a set of connection pools.
A UnionPartitionPolicy will send read queries to a set of connection pools, and have an option to replicate wrties.
All policies will provide an exclusive connection option. This will assign an accessor to the client session on the first query execution and use that connection for the duration of the client session. This will ensure that the entire transaction stays on the same node.
Partitioning policies will be globally (persistence unit) named objects and be reusable across multiple descriptor or queries. This will improve the usability of configuration, specifically with JPA annotations and XML.
The persistence unit properties will be updated to support adding named connection pool in addition to the exist config for read/write/sequence.
Fail-over...
RAC/UCP
Some cluster enabled databases define their own DataSource implementation that is cluster aware. Some support affinity support and integration with a data affinity service such as EclipseLink provides. Oracle RAC is supported through the Oracle JDBC Universal Connection Pool (UCP). UCP supports a single DataSource into the RAC and can perform its own load balancing and fail-over. UCP also supports a data affinity callback API. A callback can be registered to provide UCP a hint as to which node to direct the connection request to.
A generic DataPartitioningCallback interface will be defined in EclipseLink (platform.database.partitioning) to support integration with an external DataSource data affinity. The callback will be given a chance to register itself with the DataSource on connect. The PartitioningPolicys will have to set the partition id into the callback instead of acquiring connection from a connection pool.
A UCPDataPartitioningCallback will be added to EclipseLink's Oracle component with a dependency on ucp.jar. It will register with the UCP DataSource and maintain a ThreadLocal to store the current partition id to pass to UCP when its callback is invoked.
Testing
Testing will need to be done on both a set of non-cluster aware databases, and on an Oracle RAC. To allow for easy testing, tests should be runnable on a set of embedded Derby databases. This will allow easy testing in any environment. Since Derby is not cluster aware, write will need to be replicated for testing to allow the testing of round robin and load balancing policies.
Each of the supported policies will need to be tested.
A set of the tests should be runnable inside an application server such as WebLogic accessing an Oracle RAC.
Some performance/benchmarking should be done to ensure the scalability of the solutions.
Fail-over tests will be required to test fail-over.
API
- PartitioningPolicy
- setReplicateWrites(boolean)
- @Partitioning(name=String, partitioningClass=Class<PartitioningPolicy>)
- RoundRobinPartitioningPolicy
- addConnectionPool(String)
- @RoundRobinPartitioning(name=String, connectionPools={String}, replicateWrites=boolean)
- UnionPartitioningPolicy
- addConnectionPool(String)
- setReplicateWrites(boolean)
- @UnionPartitioning(name=String, connectionPools={String}, replicateWrites=boolean)
- ReplicationPartitioningPolicy
- addConnectionPool(String)
- @ReplicationPartitioning(name=String, connectionPools={String})
- FieldPartitioningPolicy
- setParameterName(String)
- setUnionUnpartitionableQueries(boolean)
- RangePartitioningPolicy
- addPartition(String pool, Object startValue, Object endValue)
- @RangePartitioning(name=String, partitionColumn=@Column, partitions={@RangePartition(connectionPool=String, startValue=String, endValue=String)}, unionUnpartitionableQueries=boolean, partitionValueType=Class)
- ValuePartitioningPolicy
- addPartition(String pool, Object value)
- @ValuePartitioningPolicy(name=String, partitionColumn=@Column, partitions={@ValuePartition(connectionPool=String, value=String)}, unionUnpartitionableQueries=boolean, partitionValueType=Class)
- HashPartitioningPolicy
- addConnectionPool(String pool)
- @HashPartitioning(name=String, partitionColumn=@Column, connectionPools={String}, unionUnpartitionableQueries=boolean)
- @Partitioned(String name)
DatabaseQuery
- setPartitioningPolicy(PartitioningPolicy)
- QueryHint - "eclipselink.partitioning-policy" = PartitioningPolicy/name
ClassDescriptor
- setPartitioningPolicyName(String)
- setPartitioningPolicy(PartitioningPolicy)
ForeignReferenceMapping
- setPartitioningPolicyName(String)
- setPartitioningPolicy(PartitioningPolicy)
ServerSession, Server
- setPartitioningPolicy(PartitioningPolicy)
DatabaseLogin
- setPartitioningCallback(DataPartitioningCallback)
Config files
- orm.xml
<partitioning-policy class="org.acme.MyPolicy"/> <round-robin-policy replicate-writes="true"> <connection-pool>node1</connection-pool> <connection-pool>node2</connection-pool> </round-robin-policy> <random-policy replicate-writes="true"> <connection-pool>node1</connection-pool> <connection-pool>node2</connection-pool> </random-policy> <replication-policy> <connection-pool>node1</connection-pool> <connection-pool>node2</connection-pool> </replication-policy> <range-partitioning-policy parameter-name="id" exclusive-connection="true" union-unpartitionable-queries="true"> <range-partition connection-pool="node1" start-value="0" end-value="100000" value-type="java.lang.Integer"/> <range-partition connection-pool="node2" start-value="100001" end-value="200000" value-type="java.lang.Integer"/> <range-partition connection-pool="node3" start-value="200001" value-type="java.lang.Integer"/> </range-partitioning-policy>
- persistence.xml
Existing connection pool properties:
- "javax.persistence.jdbc.url"
- "javax.persistence.nonJtaDataSource"
- "javax.persistence.jtaDataSource"
- "eclipselink.jdbc.connections.initial"
- "eclipselink.jdbc.connections.min"
- "eclipselink.jdbc.connections.max"
- "eclipselink.jdbc.write-connections.initial"
- "eclipselink.jdbc.write-connections.min"
- "eclipselink.jdbc.write-connections.max"
- "eclipselink.jdbc.read-connections.initial"
- "eclipselink.jdbc.read-connections.min"
- "eclipselink.jdbc.read-connections.max"
- "eclipselink.jdbc.sequence-connection-pool.non-jta-data-source"
- "eclipselink.jdbc.sequence-connection-pool.initial"
- "eclipselink.jdbc.sequence-connection-pool.max"
- "eclipselink.jdbc.sequence-connection-pool.min"
Named connection pool properties:
- "eclipselink.connection-pool.<name>.initial"
- "eclipselink.connection-pool.<name>.min"
- "eclipselink.connection-pool.<name>.max"
- "eclipselink.connection-pool.<name>.url"
- "eclipselink.connection-pool.<name>.jtaDataSource"
- "eclipselink.connection-pool.<name>.nonJtaDataSource"
Old properties will be moved under "connection-pool" category and deprecated, i.e.
- "eclipselink.connection-pool.max"
- "eclipselink.connection-pool.write.max"
- "eclipselink.connection-pool.read.max"
- "eclipselink.connection-pool.sequence.max"
Partitioning properties:
- "eclipselink.partitioning" - sets default partitioning policy on the session (by name)
- "eclipselink.partitioning.callback" - sets external DataSource data affinity integration callback implementation (class name)
Examples
@Entity @IdClass(EmployeePK.class) @UnionPartitioning( name="UnionPartitioningAllNodes", replicateWrites=true) @ValuePartitioning( name="ValuePartitioningByLOCATION", partitionColumn=@Column(name="LOCATION"), unionUnpartitionableQueries=true, defaultConnectionPool="default", partitions={ @ValuePartition(connectionPool="node2", value="Ottawa"), @ValuePartition(connectionPool="node3", value="Toronto") }) @Partitioned("ValuePartitioningByLOCATION") public class Employee { @Id @Column(name = "EMP_ID") private Integer id; @Id private String location; ... @ManyToMany(cascade = { PERSIST, MERGE }) @Partitioned("UnionPartitioningAllNodes") private Collection<Project> projects; ... }
@Entity @RangePartitioning( name="RangePartitioningByPROJ_ID", partitionColumn=@Column(name="PROJ_ID"), partitionValueType=Integer.class, unionUnpartitionableQueries=true, partitions={ @RangePartition(connectionPool="default", startValue="0", endValue="1000"), @RangePartition(connectionPool="node2", startValue="1000", endValue="2000"), @RangePartition(connectionPool="node3", startValue="2000") }) @Partitioned("RangePartitioningByPROJ_ID") public class Project { @Id @Column(name="PROJ_ID") private Integer id; ... }
Documentation
Should be documented under JPA and performance sections.
Open Issues
Issue # | Owner | Description / Notes |
---|---|---|
1 | Open | |
2 | Issue |
Decisions
Issue | Description / Notes | Decision |
---|
Future Considerations
- Additional partitioning policies.