001//
002// Name
003//  $RCSfile: Partition.java,v $
004// 
005// Copyright
006//      Copyright 2010-2011 Cloud Software Group, Inc. ALL RIGHTS RESERVED. 
007//      Cloud Software Group, Inc. Confidential Information
008//
009// History
010//  $Revision: 1.1.2.18 $ $Date: 2011/11/23 03:33:29 $
011//
012package com.kabira.platform.highavailability;
013
014import com.kabira.platform.disteng.DEReplicationType;
015import com.kabira.platform.disteng.DEProperties;
016import java.util.Date;
017
018/**
019 * The Partition class. This non-managed class contains a snapshot of a
020 * shared memory partition. Direct access to the Partition instance in
021 * shared memory is not allowed to avoid transaction locking issues.
022 * <p>
023 * Note that partitions are only visible on their active and replica
024 * nodes, not all nodes in the cluster.
025 */
026public final class Partition
027{
028    /**
029     * The Partition states
030     */
031    public enum State
032    {
033        /** The partition has been defined, but not enabled */
034        INITIAL,
035        /** The partition is active */
036        ACTIVE,
037        /**
038         * The partition definition is being updated. Entered when 
039         * partition membership (i.e. the objects associated with the
040         * partition) is being updated to split or merge partitions.
041         */
042        UPDATING,
043        /**
044         * The partition owner is being updated. Each object in the
045         * partition is sent to the new owner and the location definition
046         * of all instances are updated.
047         */
048        MIGRATING,
049        /**
050         * The partition replicas are being updated. Each object in the
051         * partition is sent to the new replica list.
052         */
053        REPLICATING,
054        /** 
055         * The partition is not active because all associated nodes have
056         * failed, the node was taken off-line because it is in a
057         * non-quorum state, or the node was restarted.
058         */
059        UNAVAILABLE
060    }
061
062    /**
063     * The Partition properties used when defining a partition
064     */
065    public final static class Properties
066    {
067        /**
068         * Default constructor.
069         */
070        public Properties()
071        {
072            m_restoreFromNode = null;
073            m_forceReplication = false;
074            m_objectsLockedPerTransaction = DefaultObjectsLockedPerTransaction;
075        }
076
077        /**
078         * Define the node that the partition should be restored from.
079         * <p>
080         * When this property is set, the partition defined on the given
081         * remote node is loaded to the local node. This should be done
082         * when restoring a node from a split-brain situation, where
083         * <b>nodeName</b> is the node in the cluster where all objects
084         * should be preserved, and the local node is the node being
085         * restored. Any conflicts during restore will preserve the
086         * objects on <b>nodeName</b>, and remove the conflicting objects
087         * on the local node.
088         * <p>
089         * A restore is needed when multiple nodes are currently the active
090         * node for a partition in a cluster due to a split-brain scenario.
091         * In this case, the application needs to decide which active node
092         * will be the node where the objects are preserved during a
093         * restore. Note that the <b>nodeName</b> does not necessarily have
094         * to be the node which becomes the partition's active node after the
095         * restore completes.
096         * <p>
097         * The actual restore of the partition is done in the
098         * enablePartitions() method when the JOIN_CLUSTER_RESTORE
099         * EnableAction is used. If any other EnableAction is used,
100         * object data isn't preserved, and no restoration of partition
101         * objects is done.
102         * <p>
103         * If restoreFromNode isn't set after a split-brain scenario, the
104         * runtime will perform a cluster wide broadcast to find the
105         * current active node, and use that node to restore instances in
106         * the partition. If multiple active nodes are found, the first
107         * responder is chosen.
108         * <p>
109         * @param nodeName The remote node to use when restoring the
110         *  partition's objects.
111         *
112         * @exception IllegalArgumentException
113         * The nodeName was empty.
114         * @see PartitionManager#definePartition(String,
115         *              Partition.Properties, String, ReplicaNode [])
116         * @see PartitionManager.EnableAction
117         */
118        public void restoreFromNode(String nodeName)
119            throws IllegalArgumentException
120        {
121            if (nodeName.isEmpty())
122            {
123                throw new IllegalArgumentException(
124                    "nodeName cannot be empty");
125            }
126            m_restoreFromNode = nodeName;
127        }
128
129        /**
130         * Get the current restoreFromNode property value.
131         * @return String containing value.
132         */
133        public final String getRestoreFromNode()
134        {
135            return m_restoreFromNode;
136        }
137
138        /**
139         * Determine how objects are replicated during a migrate of
140         * an object partition.
141         * <p>
142         * When set to true, a migrate() or
143         * definePartition()/enablePartition() will force the copy of
144         * partitioned objects to all pre-existing replica nodes. The
145         * default value for this property is false, objects are only
146         * copied to new replicas as they are added since the objects
147         * should already exist on the pre-existing replica nodes.
148         * <p>
149         * Normally, a migrate will skip the replication of objects to
150         * pre-existing nodes in the partition's replica node list. This
151         * allows applications to incrementally add replica nodes without
152         * having to copy the objects to replicas that already exist in
153         * the partition. However, if one or more replicas have gone
154         * offline, or were not discovered when the partition was first
155         * enabled, this property can be set to insure that objects are
156         * pushed to all replicas in the cluster.
157         * <p>
158         * Warning: This is performance hostile, and should only be done
159         * if the replica can't be manually taken offline and restored.
160         * <p>
161         * The value passed into definePartition() is stored and used in
162         * failover. The value passed to migrate() overrides
163         * the value passed to definePartition().
164         *
165         * @param enabled If true, force the copy of objects to all replicas
166         *  when a migrate() or enablePartition() is executed.
167         *
168         */
169        public void forceReplication(boolean enabled)
170        {
171            m_forceReplication = enabled;
172        }
173
174        /**
175         * Get the current forceReplication property value.
176         * @return boolean containing value.
177         */
178        public final boolean getForceReplication()
179        {
180            return m_forceReplication;
181        }
182
183        /**
184         * Define the number of objects locked in a transaction when performing
185         * a migrate() or update().
186         * <p>
187         * When distribution performs a migrate() or update(), or when it
188         * performs a migrate() as part of failover, the work is done in
189         * units of work defined by objectsLockedPerTransaction. This
190         * allows applications to concurrently run while the work is in
191         * progress, otherwise all partitioned instances would be locked
192         * in the calling transaction, preventing application code from
193         * establishing a write lock on instances on the active node, or
194         * establishing a read lock on instances on replica nodes.
195         * <p>
196         * The default is defined by {@link #DefaultObjectsLockedPerTransaction}
197         * <p>
198         * If objectsLockedPerTransaction is set to zero, all instances will
199         * be processed in the caller's transaction.
200         * <p>
201         * If the calling transaction has one or more partitioned
202         * instances locked in the transaction,
203         * objectsLockedPerTransaction is ignored, and all work is done
204         * in the caller's transaction. To insure that migrate() or update()
205         * minimizes the number of locks taken, they should be run in separate
206         * transactions that have no partitioned instances locked. 
207         * <p>
208         * The value passed into definePartition() is stored and used in
209         * failover. The value passed to migrate() and update() override
210         * the value passed to definePartition().
211         * 
212         * @param objectsLockedPerTransaction Number of objects locked
213         *  per transaction when sending data to remote nodes.
214         * @exception IllegalArgumentException The objectsLockedPerTransaction
215         *  value was negative.
216         */
217        public void setObjectsLockedPerTransaction(
218            long objectsLockedPerTransaction)
219        {
220            if (objectsLockedPerTransaction < 0)
221            {
222                throw new IllegalArgumentException(
223                    "invalid objectsLockedPerTransaction: " +
224                    objectsLockedPerTransaction);
225            }
226            m_objectsLockedPerTransaction = objectsLockedPerTransaction;
227        }
228
229        /**
230         * Get the current objectsLockedPerTransaction property value.
231         * @return long containing value.
232         */
233        public final long getObjectsLockedPerTransaction()
234        {
235            return m_objectsLockedPerTransaction;
236        }
237
238        //
239        // package private constructor
240        //
241        Properties(boolean forceReplication, long objectChunks)
242        {
243            m_restoreFromNode = null;
244            m_forceReplication = forceReplication;
245            m_objectsLockedPerTransaction = objectChunks;
246        }
247
248        String m_restoreFromNode;
249        boolean m_forceReplication;
250        long m_objectsLockedPerTransaction;
251    }
252
253    /** Index to the active node in the nodes array */
254    public final static int     ActiveNodeIndex = 0;
255
256    /**
257     * The default number of objects locked per transaction when
258     * performing a migrate or update.
259     */
260    public final static long    DefaultObjectsLockedPerTransaction = 1000;
261
262    /**
263     * Get the partition name.
264     * @return Partition name.
265     */
266    public final String getName()
267    {
268        return m_name;
269    }
270
271    /**
272     * Get the active node for the partition.
273     * @return Active node name.
274     */
275    public final String getActiveNode()
276    {
277        assert( m_nodes.length >= 1 );
278        return m_nodes[ActiveNodeIndex];
279    }
280
281    /**
282     * Get the replica node list for the partition.
283     * @return Array of replica node names.
284     */
285    public final String [] getReplicaNodes()
286    {
287        assert( m_nodes.length >= 1 );
288        String [] replicas = new String[m_nodes.length - 1];
289        for (int i = 0; i < m_nodes.length - 1; i++)
290        {
291            replicas[i] = m_nodes[i + 1];
292        }
293        return replicas;
294    }
295
296    /**
297     * Get the complete node list for the partition.
298     * <p>
299     * The node list is a String array of node names, with the active
300     * node being at ActiveNodeIndex, followed by a prioritized list of
301     * replica nodes.
302     * @return Array of node names.
303     */
304    public final String [] getNodeList()
305    {
306        return m_nodes;
307    }
308
309    /**
310     * Get the current state for the partition.
311     * @return Current state of partition.
312     */
313    public final State getCurrentState()
314    {
315        return m_currentState;
316    }
317
318    /**
319     * Get the ReplicaNode instance for the given replica node index.
320     * @param idx Index into the ReplicaNode array.
321     * @return ReplicaNode instance.
322     * @exception IndexOutOfBoundsException
323     *  The idx isn't valid.
324     */
325    public final ReplicaNode getReplicaNode(int idx)
326        throws IndexOutOfBoundsException
327    {
328        // m_nodes[0] is active node, so we need to disallow -1
329        if (idx < 0) throw new IndexOutOfBoundsException(Integer.toString(idx));
330        return new ReplicaNode(m_nodes[idx + 1],
331            PartitionManager.mapDEReplicationType(
332                m_replicationTypes[idx + 1]));
333    }
334
335    /**
336     * Get the ReplicaNode instance for the given replica node name.
337     * @param nodeName Name of replica node.
338     * @return ReplicaNode instance, or null if not found.
339     * @exception IllegalArgumentException The nodeName was invalid.
340     */
341    public final ReplicaNode getReplicaNode(final String nodeName)
342        throws IllegalArgumentException
343    {
344        if (nodeName.isEmpty())
345        {
346            throw new IllegalArgumentException("nodeName cannot be empty");
347        }
348        for (int idx = 1; idx < m_nodes.length; idx++)
349        {
350            if (m_nodes[idx].equals(nodeName))
351            {
352                return getReplicaNode(idx - 1);
353            }
354        }
355        return null;
356    }
357
358    /**
359     * Get the last time the state for the partition was updated
360     * @return Date of last state change for partition.
361     */
362    public final Date getLastStateChangeTime()
363    {
364        return m_lastUpdated;
365    }
366
367    /**
368     * Get the properties currently defined for the partition.
369     * <p>
370     * The restoreFromNode property is not stored in the runtime since it
371     * has no meaning outside of the initial definePartition(). So this method
372     * will return a Properties instance with a null restoreFromNode value.
373     *
374     * @return Properties instance for partition.
375     */
376    public final Properties getProperties()
377    {
378        return new Properties(m_forceReplication, m_objectChunks);
379    }
380
381    /**
382     * Use {@link #update(Properties)} instead.
383     */
384    @Deprecated
385    public final void update() throws NodeMismatch, NotActiveNode
386    {
387        update(null);
388    }
389
390    /**
391     * Update the partition.
392     * <p>
393     * This method is used to re-partition all instances that exist in a
394     * partition. For each instance in the partition, the {@link
395     * PartitionMapper} defined for that type is accessed, and the
396     * instance re-assigned to the partition that the {@link
397     * PartitionMapper#getPartition} method returns. The update() method
398     * must be called on the current active node for the partition.
399     * <p>
400     * To split a partition, the applications should create the new
401     * partition, install a new PartitionMapper for all types managed by
402     * the partition, and call the update() method for the existing
403     * partition.
404     * <p>
405     * To merge two or more partitions, the applications should install a
406     * new PartitionMapper for all types managed by the partition(s), and
407     * call the update() method for all the partitions that need to
408     * be merged.
409     * <p>
410     * It is important that all partitions returned when executing the
411     * PartitionMapper's getPartition() call for a type all contain
412     * identical node lists. If the partition returned contains a
413     * different node list than this partition, a NodeMismatch exception
414     * is thrown, and the update() terminates. To fix this, perform a
415     * migrate() on all partitions that will be split or merged to insure
416     * that they have identical node lists before performing an update().
417     *
418     * @param partitionProperties Optional properties for the partition.
419     *  If null, the default property values are used.
420     * 
421     * @exception NodeMismatch
422     *  The re-partitioning of instances was done using partitions that
423     *  have different node lists.
424     * @exception NotActiveNode
425     *  The current node is not the active node for the partition.
426     */
427    public final void update(final Properties partitionProperties)
428            throws NodeMismatch, NotActiveNode
429    {
430        DEProperties props = new DEProperties();
431
432        if (partitionProperties == null)
433        {
434            props.forceReplication = false;
435            props.objectChunks = Partition.DefaultObjectsLockedPerTransaction;
436        }
437        else
438        {
439            props.forceReplication = partitionProperties.m_forceReplication;
440            props.objectChunks =
441                partitionProperties.m_objectsLockedPerTransaction;
442        }
443
444        PartitionManager.updatePartition(m_name, props);
445    }
446
447    /**
448     * Use {@link #migrate(Properties, String, ReplicaNode [])} instead.
449     */
450    @Deprecated
451    public final void migrate(String [] nodes)
452        throws NotActiveNode, IllegalArgumentException
453    {
454        PartitionManager.validateNodeList(nodes);
455        DEReplicationType [] drl = new DEReplicationType[nodes.length];
456        for (int i = 0; i < drl.length; i++)
457        {
458            drl[i] = DEReplicationType.DataSynchronous;
459        }
460
461        DEProperties props = new DEProperties();
462        props.forceReplication = false;
463        props.objectChunks = Partition.DefaultObjectsLockedPerTransaction;
464
465        PartitionManager.migratePartition(m_name, props, nodes, drl);
466    }
467
468    /**
469     * Migrate the partition.
470     * <p>
471     * This method is used to migrate all instances that exist in a
472     * partition. For each instance in the partition, the object is
473     * migrated as needed to all nodes in the replicas array, and to the
474     * new activeNode if it is different from the current active node. This
475     * method must be called on the currently active node for the partition.
476     * <p>
477     * Any properties passed in only apply to the current migrate() command,
478     * properties passed into definePartition() are saved and used during
479     * failover. 
480     * 
481     * @param partitionProperties Optional properties for the partition.
482     *  If null, the default property values are used.
483     * @param activeNode Active node after migrate completes.
484     * @param replicas An ordered list of replica nodes for the partition.
485     *  Should be passed in as a null instance or a zero length array if
486     *  no replicas exists.
487     *
488     * @exception NotActiveNode
489     *  The current node is not the active node for the partition.
490     * @exception IllegalArgumentException
491     * The activeNode or replica array was invalid.
492     */
493    public final void migrate(
494        final Properties partitionProperties,
495        String activeNode,
496        ReplicaNode [] replicas)
497            throws NotActiveNode, IllegalArgumentException
498    {
499        PartitionManager.validateReplicaList(replicas);
500
501        int nodeLength = (replicas == null) ? 1 : replicas.length + 1;
502        String [] nodes = new String[nodeLength];
503
504        nodes[0] = activeNode;
505        if (replicas != null )
506        {
507            for (int i = 0; i < replicas.length; i++)
508            {
509                assert( i + 1 < nodes.length );
510                nodes[i + 1] = replicas[i].nodeName;
511            }
512        }
513        PartitionManager.validateNodeList(nodes);
514
515        DEReplicationType [] drl = new DEReplicationType[nodes.length];
516        drl[0] = DEReplicationType.DataSynchronous;
517        if (replicas != null)
518        {
519            for (int i = 0; i < replicas.length; i++)
520            {
521                assert( i + 1 < nodes.length );
522                drl[i + 1] = PartitionManager.mapReplicationType(
523                    replicas[i].replicationType);
524            }
525        }
526        boolean forceReplication = (partitionProperties != null)
527            ? partitionProperties.m_forceReplication : false;
528
529        DEProperties props = new DEProperties();
530        if (partitionProperties == null)
531        {
532            props.forceReplication = false;
533            props.objectChunks = Partition.DefaultObjectsLockedPerTransaction;
534        }
535        else
536        {
537            props.forceReplication = partitionProperties.m_forceReplication;
538            props.objectChunks =
539                partitionProperties.m_objectsLockedPerTransaction;
540        }
541        PartitionManager.migratePartition(m_name, props, nodes, drl);
542    }
543
544    /**
545     * Associated a notifier with the partition.
546     * <p>
547     * This method is used to associate a user defined {@link PartitionNotifier}
548     * instance with a partition.
549     * <p>
550     * PartitionNotifier instances are local to a node, this method
551     * should be executed on all nodes which need to determine if a
552     * partition state change occurs.
553     * <p>
554     * The same PartitionNotifier instance can be associated with multiple
555     * partitions. Multiple PartitionNotifier instances can be installed for
556     * a given Partition. When a state change occurs, all instances are
557     * executed, no order is guaranteed when executing the notifiers.
558     * To remove a notifier, the PartitionNotifier instance should be deleted.
559     * 
560     * @param partitionNotifier User defined notifier instance.
561     *
562     */
563    public final void setNotifier(PartitionNotifier partitionNotifier)
564    {
565        PartitionManager.setNotifier(m_name, partitionNotifier);
566    }
567
568    /** The name of the partition */
569    private final String         m_name;
570
571    /** The list of nodes for a partition */
572    private final String []      m_nodes;
573
574    /** The list of nodes for a partition */
575    private final DEReplicationType [] m_replicationTypes;
576
577    /** The current state of the partition */
578    private final State          m_currentState;
579
580    /** The time it was updated */
581    private final Date          m_lastUpdated;
582
583    /** stored property */
584    private final boolean       m_forceReplication;
585
586    /** stored property */
587    private final long          m_objectChunks;
588
589    //
590    // FIX THIS: Add stats attributes...
591    //
592
593    //
594    // Package private constructor
595    //
596    Partition(
597        String name,
598        String [] nodes,
599        DEReplicationType [] replicationTypes,
600        State currentState,
601        Date lastUpdated,
602        boolean forceReplication,
603        long objectChunks)
604    {
605        this.m_name = name;
606        this.m_nodes = nodes;
607        this.m_replicationTypes = replicationTypes;
608        this.m_currentState = currentState;
609        this.m_lastUpdated = lastUpdated;
610        this.m_forceReplication = forceReplication;
611        this.m_objectChunks = objectChunks;
612    }
613    //
614    // Never should be called
615    //
616    private Partition()
617    {
618        this.m_name = null;
619        this.m_nodes = null;
620        this.m_replicationTypes = null;
621        this.m_currentState = null;
622        this.m_lastUpdated = null;
623        this.m_forceReplication = false;
624        this.m_objectChunks = 0;
625    }
626}