EventFlow Applications and LiveView: Publishers

A LiveView embedded publisher is a StreamBase application that runs in the context of LiveView and publishes data to one or more tables in a LiveView project. The embedded publisher can push data to an existing table without the table needing to be reconfigured.

A publisher differs from a data-source in that a data-source can only publish to one table (although a table may have multiple data sources). This figure shows a simple data-source model:

This figure shows a simple publisher model:

The important difference between these two models is that the data-source has to be configured within the table configuration to which it sends data. A publisher does not need to be mentioned in a table's lvconf file; the publisher itself specifies what table or tables it sends data to.

This page describes the Embedded Publisher sample. StreamBase Systems recommends that you use this sample as a model for developing your own embedded publishers.

Load and Run the Embedded Publisher Sample

First, if you have not already done so, load the Hello LiveView sample. Follow the steps described here. The embedded publisher sample is based on the Hello LiveView sample, so it is useful to have the Hello LiveView sample available for review.

Now, load and run the "Shows an embedded publisher application" sample. Follow these steps:

  1. Start in StreamBase Studio, in the SB Authoring perspective.

  2. Load the Embedded Publisher sample.

    1. Select FileLoad StreamBase Sample from Studio's top-level menu.

    2. In the Load StreamBase Projects dialog, open the StreamBase LiveView category.

    3. Make sure that the Open the selected sample's README file after importing check box is selected.

    4. Select the sample whose description is Shows an embedded publisher application and press OK.

    The sample loads into Studio with the project name sample_lv-publisher. The README file in the sample's top-level folder contains valuable information on the sample and how it works.

  3. In the Package Explorer view, select the name of the project, right-click, and from the context menu, select Run AsStreamBase LiveView Project. The Console view shows several messages as the LiveView Server compiles the project and starts.

  4. When you see message All tables have been loaded in the Console view, start LiveView Desktop:

    • On Windows, run StartAll ProgramsStreamBase LiveView n.mLiveView Desktop.

    • On Linux, run the following command:

      /opt/streambase/liveview/desktop/liveview &
      
  5. In LiveView Desktop, connect to LiveView Server with FileConnect to Server. Enter your username and the default URL, lv://localhost:10080

Publisher Configuration

Adding an embedded publisher to LiveView requires a LiveView configuration file describing the publisher. In this sample, the lvconf file called ItemsPublisher.lvconf configures the publisher. The root element of this configuration file is <liveview-configuration>, as is true for all LiveView configuration files.

The <publisher> element contains attributes that specify the embedded publisher name, description, and the location of the StreamBase application writes to the LiveView tables. The publisher element of the ItemsPublisher.lvconf file is shown here:

<!-- This file configures an embedded publisher for the Items and Categories tables. -->
    <publisher 
        id="ItemsPublisher"
        description="Publishes to the Items table"
        short-description="Items Table Publisher"
        filename="ItemsPublisher.sbapp"
        sub-folder="ItemsPublisher">
        
        <!-- The LiveView tables this publisher publishes to, in this case Items and Categories. -->
        <tables>
            <table table-ref="Items"></table>
            <table table-ref="Categories"></table>
        </tables>
        
    </publisher>

These attributes of the <publisher> element are:

id

Unique string identifier for the publisher (required).

description

A full description of the publisher (optional).

short-description

Short descriptive name (optional).

filename

The EventFlow application that writes the data to LiveView tables (required).

sub-folder

The sub-folder in the main project folder for the application file filename (required). StreamBase Systems recommends that you store all project applications in a designated sub-folder underneath the project's root directory.

Th <tables> element is a child of the <publisher> element. This element contains one or more <table> child elements . The <table> attribute table-ref is the name of the project table to which the publisher sends data. The <tables> element of the ItemsPublisher.lvconf file is shown here. It will publish to the Items table and the Categories table.

<tables>
   <table table-ref="Items"></table>
   <table table-ref="Categories"></table>
</tables>

Container Connections

The publisher configures the input and output streams for publication in the project tables. When LiveView Server starts it establishes the connections between the project containers so that data is published in the selected tables. You can see how this works by looking at how the project containers are connected. To do this, open a StreamBase command-line prompt. The sbc list -C ItemsPublisher container-connections command shows all container connections to the ItemsPublisher container:

>sbc list -C ItemsPublisher container-connections
container-connections   Categories.DataIn=ItemsPublisher.PublishToCategoriesOut
container-connections   Categories.QueryTheLastGoodRecord=ItemsPublisher.QueryTheLastGoodRecordOut
container-connections   Items.DataIn=ItemsPublisher.PublishToItemsOut
container-connections   Items.QueryTheLastGoodRecord=ItemsPublisher.QueryTheLastGoodRecordOut
container-connections   ItemsPublisher.ServerStatusIn=default.ServerStatus
container-connections   ItemsPublisher.TheLastGoodRecordIn=Categories.TheLastGoodRecordOut
container-connections   ItemsPublisher.TheLastGoodRecordIn=Items.TheLastGoodRecordOut

The container-connections connect destination streams to source streams. So, for example, the statement:

Items.DataIn=ItemsPublisher.PublishToItemsOut

assigns the output stream PublishToItemsOut to the Items table stream DataIn. Every LiveView table receives data from a DataIn stream.

Ease of Table Configuration

An embedded publisher can publish data to tables without those tables being configured to call the publisher. This means that the lvconf files for individual tables do not need to have data-sources individually configured. Compare Items.lvconf from the Hello LiveView sample to Items.lvconf in the publisher sample. In the publisher sample, the Items table gets data from the configured publisher. The table's lvconf file does not have a data-source configured:

<data-table table-space-ref="PersistentTableSpace" id="Items"
            description="Live view of all items, quantity in stock and the last sold price"
            short-description="Quantity and last-sold prices">
  <fields>
    <!-- The publisher sets these fields -->
    <field name="sku" type="string"></field>
    <field name="category" type="string"></field>
    <field name="color" type="string"></field>
    <field name="quantityRemaining" type="int"></field>
    <field name="lastSoldPrice" type="double"></field>
  </fields>
  <primary-key>
    <field ref="sku" />
  </primary-key>
</data-table>

In contrast, the Items table in the Hello LiveView sample receives data from a data-source, in this case, an application called from the project folder ItemsDataSource. The Hello LiveView Items.lvconf file looks like this:

<data-table table-space-ref="DefaultTableSpace" id="Items"
            description="Live view of all items, quantity in stock and the last sold price"
            short-description="Quantity and last-sold prices">
  <fields>
    <field name="sku" type="string"></field> <!-- The application in the data-source tag sets this field -->
    <field name="category" type="string"></field>
    <field name="color" type="string"></field>
    <field name="quantityRemaining" type="int"></field> <!-- see the comment for 'sku' -->
    <field name="lastSoldPrice" type="double"></field> <!-- see the comment for 'sku' -->
  </fields>
  <primary-key>
    <field ref="sku" />
  </primary-key>
<!-- This declares that an output stream from our data source application (defined
     in ItemsDataSource.lvconf) feeds this table. This application is started as
     soon as all LiveView tables have loaded. -->
  <data-sources>
    <data-source>
      <application output-stream="Data" application-ref="ItemsDataSource" />
    </data-source>
  </data-sources>
</data-table>

Persistence and Recovery

LiveView tables can be configured for persistence such that their contents are saved to disk in log-based files and recovered from these files after a crash or server restart. Part of the recovered data is the table’s publisher sequence number. During initialization, an embedded publisher typically retrieves the sequence for each persistent table and resumes publishing at that point, to avoid overwriting previously written, recovered records.

The embedded publisher interface provides two streams for retrieving the publisher sequence number:

  • QueryTheLastGoodRecordOut – An embedded publisher emits a tuple on this output stream to request sequence number information for a specific table. This stream has the following schema:

    • CQSConnectionID (string) – An embedded publisher-provided value that is echoed in the response, allowing the publisher to match requests with responses.

    • PublisherID (string) – The ID of the embedded publisher that was used to publish the existing records to the LiveView table prior to the crash or restart. A null requests sequence number information for all previous publishers of the table.

    • Tablename (string) – The name of the LiveView table for which sequence number information is being requested. A null requests sequence number information for all the tables this publisher is publishing to.

  • TheLastGoodRecordIn – Responses to the query on the last good record requests arrive on this data stream. If the PublisherID was null in the request, a response is received for each publisher of the LiveView table. This stream has the following schema:

    • CQSConnectionID (string) – The value echoed from the query of the last good record.

    • PublisherID (string) – The ID of the publisher for which sequence number information is provided. This value is normally echoed from the query of the last good record. If the PublisherID was null in the request, a response is received for each previous table publisher. A null in this field identifies this as a punctuation tuple, which indicates all recovery information has been returned for the specified table.

    • LowestPublishedSN – The lowest published sequence number available across all the parallel regions comprising the LiveView table.

    • LowestPersistedSN – The lowest persisted sequence number available across all the parallel regions comprising the LiveView table. In recovering after a server restart, an embedded publisher typically resumes publishing from this value.

    • HighestPublishedSN – The highest published sequence number available across all the parallel regions comprising the LiveView table.

    • HighestPersistedSN – The highest persisted sequence number available across all the parallel regions comprising the LiveView table.

    • Tablename (string) – The name of the LiveView table for which sequence number information is provided. This value is normally echoed from the query of the last good record. If the Tablename was null in the request, a response is received for each LiveView table being published to.

The publisher, in requesting last good record information, has the option of specifying:

  • A table name,

  • A publisher ID,

  • Both a table name and a Publisher ID, or

  • Neither a table name nor a publisher ID.

Specifying a null table name in the request retrieves information for all tables being published to, while specifying a null publisher ID requests the last good record for all previous publishers to the table(s).

In response to a “query the last good record” request, LiveView Server returns one or more tuples for each table specified in the request. If the table name is null in the request, the server returns responses for all tables being published to; otherwise it returns responses for just the specified table.

The last response tuple returned by LiveView Server for each table is a punctuation tuple, which does not carry last good record information (all sequence number fields are null) and is identified by a null in the PublisherID field. All tuples returned by the server, including punctuation tuples, have a non-null Tablename field.

Thus, in response to a “query the last good record” request, the publisher should expect either one punctuation tuple, if the Tablename field was non-null in the request, or one punctuation tuple per table the publisher is configured to publish to, if the Tablename field was null in the request. The number of tables the publisher is configured to publish to is equal to the number of top-level fields present in the PublishSchemasIn input stream's schema.

The following example presents a publish scenario followed by the recovery activity for each of the four “query the last good record” request combinations.

Published tuples:

(Publisher A, 1) -> Table-X
(Publisher A, 2) -> Table-Y 
(Publisher B, 3) -> Table-X 
(Publisher B, 4) -> Table-Y 
(Publisher C, 5) -> Table-X 
(Publisher D, 6) -> Table-Y

If publisher sends request with PublisherID == null, Tablename == null, server returns:

(Publisher A, Table-X, 1) 
(Publisher B, Table-X, 3) 
(Publisher C, Table-X, 5) 
(null, Table-X, null) <- punctuation table for Table-X 

(Publisher A, Table-Y, 2) 
(Publisher B, Table-Y, 4) 
(Publisher D, Table-Y, 6) 
(null, Table-Y, null) <- punctuation table for Table-Y

If publisher sends request with PublisherID == Publisher A, Tablename == null, server returns:

(Publisher A, Table-X, 1) 
(null, Table-X, null)

(Publisher A, Table-Y, 2) 
(null, Table-Y, null)

If publisher sends request with PublisherID == null, Tablename == Table-X, server returns:

(Publisher A, Table-X, 1) 
(Publisher B, Table-X, 3) 
(Publisher C, Table-X, 5) 
(null, Table-X, null) <- punctuation table for Table-X

If publisher sends request with PublisherID == Publisher A, Tablename == Table-X, server returns:

(Publisher A, Table-X, 1) 
(null, Table-X, null)

Note

When responding to query the last good record requests for multiple tables (Tablename is null) the response tuples for the tables are generated in parallel and can therefore be interspersed. However, the punctuation tuple is always the last tuple returned for a specific table.

Walk-Through of the ItemsPublisher Adapter

The LiveView embedded publisher sample consists of a StreamBase application, ItemsPublisher.sbapp, whose key component is an embedded input adapter, ItemsPublisher.java. The adapter source code is provided with the sample and is intended to be used as a starting point for LiveView customers creating their own embedded publishers.

Double-click the application name in Package Explorer view to open this application in the EventFlow editor.

The adapter has three input streams and three output streams corresponding to streams of the LiveView-created interface for the publisher. The first input stream, PublishSchemasIn, has two fields of type tuple that convey the schemas of the adapter’s first two output ports. No tuples flow through this stream; it is used strictly to convey schema information, allowing the adapter to set the schemas of its first two output ports, which are used to publish to the Items and Categories tables.

The second input stream, ServerStatusIn, allows the adapter to determine when the LiveView server is ready to accept published tuples. Its single input field, IsReady, contains true when the LiveView server is ready.

The third input stream and third output stream are used to retrieve sequence numbers during recovery, as described above.

The adapter emulates the data source used by the Hello LiveView sample. It creates a fixed set of 33 SKUs for which price and volume information is published to the Items table. The adapter’s Tuples per Second property determines the rate at which tuples are published. To illustrate multi-table publishing, the adapter publishes to a second table, Categories, a mapping of category names to descriptions. A second property, Log Level, controls the adapter’s verbosity of logging.

During initialization, the adapter’s typecheck() method validates its property values and the schemas of its input ports, sets the schemas of its output ports, and creates cached tuples for publishing to the Items and Categories tables. The adapter’s init() method then generates the 33 SKUs used for publishing to the Items table and registers its run() method, which executes once the adapter completes initialization.

The adapter’s processTuple() method is invoked when a tuple is received on one of its input ports. When a server status tuple arrives indicating the LiveView server is ready, the adapter sets a flag that signals its run() method to begin publishing.

The adapter’s run() method executes once initialization is complete. It polls the LiveView server ready flag used to commence publishing, sleeping in between to avoid a CPU spin loop. Once LiveView is ready, the adapter emits a tuple on its QueryTheLastGoodRecordOut port to retrieve the Items table’s sequence number at which to resume publishing. The Items table, being persistent, might contain previously-written, recovered records that shouldn’t be overwritten. The response to the “query the last good request” comes in via a tuple to the adapter’s processTuple() method on its TheLastGoodRecordIn port. The lowest persisted sequence number from this table is saved and subsequently used by the run() method in publishing to the Items table. The adapter’s run() method publishes once to the (static) Categories table a mapping of names to descriptions.

The adapter then begins publishing to the Items table. The adapter publishes at the rate specified by its Tuples per Second property. During each one-second interval, it attempts to spread the publishing of tuples evenly across the remaining time in this interval. For example, if the adapter is configured to publish 10 tuples per second, it sleeps approximately 100 ms between publishing each tuple. The sleep time is adjusted during the interval to account for the time spent doing the actual publishing.

Creating a New Embedded Publisher

Creating an embedded publisher involves the following three steps:

  1. Create an embedded publisher .lvconf file

    This can be accomplished in StreamBase Studio or with any standard text editor. If done in Studio, content assist is available.

  2. Create an embedded publisher interface file.

    The embedded publisher .lvconf file should be placed in the new LiveView project’s top-level directory. The embedded publisher interface file is then generated with the command:

    lv-server compile path-to-lv-project-dir
    
  3. Create a StreamBase application that implements the interface. An embedded publisher application that implements the new interface is created in Studio’s SB Authoring perspective as follows:

    1. The directory containing the new interface file must be on the project’s module search path. Right-click the project folder and select StreamBaseAdd Module to Search Path.

    2. Select FileNewEventFlow Application from the top-level menu.

    3. Select the Implement interfaces check box.

    4. Click Next.

    5. Click Add….

    6. Browse to the interface file created above.

    7. Add one or more StreamBase operators that connect the input and output streams and perform the actual publishing.

Modifying the Embedded Publisher Sample

This section shows you how to modify the embedded publisher sample application by having it publish to a third LiveView table. It assumes you have loaded the sample in Studio, as described above.

  1. Add a new LiveView configuration file for the additional table you intend to publish to. Follow the steps shown here. For example, right-click the project folder in the Package Explorer view, select New StreamBase LiveView Configuration File, and enter MyTable.lvconf in the LiveView Configuration file name field.

  2. Add a data-table tag to the MyTable.lvconf file and enter values for the table-space-ref and id attributes as follows:

    <data-table table-space-ref="PersistentTableSpace" id="MyTable">
          <fields>
            <field name="pkey" type="string" />
            <field name="val" type="string" />
          </fields>
          <primary-key>
            <field ref="pkey"/>
          </primary-key>
    </data-table>
    

    These values represent a minimum configuration for a new table.

  3. Open the embedded publisher LiveView configuration file, ItemsPublisher.lvconf, in the LiveView Configuration File Editor.

  4. In the <tables> section, add a new XML element <table> that references the new table you named in step (1):

    <tables>
         <table table-ref="Items"></table>
         <table table-ref="Categories"></table>
         <table table-ref="MyTable"></table>
    </tables>
    
  5. From a StreamBase command prompt or terminal window, recompile the embedded publisher project using a command like the following:

    lv-server compile <path-to-embedded-publisher-sample>
    

    When the message "LiveView compiled configuration is available at ..." appears on the screen, the project has recompiled successfully.

  6. In Studio, open ItemsPublisher/ItemsPublisher.sbapp, right-click the canvas and select Refresh Project Typecheck Environment (F5).

  7. Click the Definitions tab. In the Manage Interfaces pane, click Add Missing Items… and click OK.

  8. In the Editor tab, connect the fourth output port of the ItemsPublisher input adapter to the PublishToMyTableOut output stream and save the result.

  9. Run the modified sample. Right-click its top-level folder in the Package Explorer and select Run AsStreamBase LiveView Project.

    When LiveView Server starts, the Items table will receive data. If this happens, you have successfully created a new output port and table for the project.

  10. When done, stop LiveView Server in Studio by clicking the red square Terminate button () in the Console View's toolbar.

You can now open the Items Publisher input adapter source module, ItemsPublisher.java, and add code to the run method to publish to the MyTable LiveView table by emitting tuples on the new output port.