To use a custom queue processor, you must create a queue processor class, compile it, add it to the class path, and then configure it. The examples in this topic are part of the adaptive polling sample application included in the LiveCycle Data Services ES samples web application.
To create a custom queue processor class, you must extend the FlexClientOutboundQueueProcessor class. This class is documented in the public LiveCycle Data Services ES Javadoc API documentation. It provides the methods described in the following table:
|
Method |
Description |
|---|---|
| initialize(ConfigMap properties) |
Initializes a new queue processor instance after it is associated with its corresponding FlexClient, but before any messages are enqueued. |
| add(List queue, Message message) |
Adds the message to the queue at the desired index, conflates it with an existing message, or ignores it entirely. |
| FlushResult flush(List queue) |
Removes messages from the queue to be flushed out over the network. Can contain an optional wait time before the next flush is invoked. |
| FlushResult flush(MessageClient messageClient, List queue) |
Removes messages from the queue for a specific MessageClient subscription. Can contain an optional wait time before the next flush is invoked. |
The following example shows the source code for a custom queue processor class that sets the delay time between flushes in its flush(List outboundQueue) method.
package flex.samples.qos;
import java.util.ArrayList;
import java.util.List;
import flex.messaging.client.FlexClient;
import flex.messaging.client.FlexClientOutboundQueueProcessor;
import flex.messaging.client.FlushResult;
import flex.messaging.config.ConfigMap;
import flex.messaging.MessageClient;
/**
* Per client queue processor that applies custom quality of
* service parameters (in this case: delay).
* Custom quality of services parameters are read from the client FlexClient
* instance.
* In this sample, these parameters are set in the FlexClient instance by
* the client application using the flex.samples.qos.FlexClientConfigService
* remote object class.
* This class is used in the per-client-qos-polling-amf channel definition.
*
*/
public class CustomDelayQueueProcessor extends FlexClientOutboundQueueProcessor
{
/**
* Used to store the last time this queue was flushed.
* Starts off with an initial value of the construct time for the
* instance.
*/
private long lastFlushTime = System.currentTimeMillis();
/**
* Driven by configuration, this is the configurable delay time between
* flushes.
*/
private int delayTimeBetweenFlushes;
public CustomDelayQueueProcessor()
{}
/**
* Sets up the default delay time between flushes. This default is used
* if a client-specific
* value has not been set in the FlexClient instance.
*
* @param properties A ConfigMap containing any custom initialization
* properties.
*/
public void initialize(ConfigMap properties)
{
delayTimeBetweenFlushes = properties.getPropertyAsInt("flush-delay", -1);
if (delayTimeBetweenFlushes < 0)
throw new RuntimeException("Flush delay time for DelayedDeliveryQueueProcessor
must be a positive value.");
}
/**
* This flush implementation delays flushing messages from the queue
* until 3 seconds have passed since the last flush.
*
* @param outboundQueue The queue of outbound messages.
* @return An object containing the messages that have been removed
* from the outbound queue
* to be written to the network and a wait time for the next flush
* of the outbound queue
* that is the default for the underlying Channel/Endpoint.
*/
public FlushResult flush(List outboundQueue)
{
int delay = delayTimeBetweenFlushes;
// Read custom delay from client's FlexClient instance
System.out.println("***"+getFlexClient());
FlexClient flexClient = getFlexClient();
if (flexClient != null)
{
Object obj = flexClient.getAttribute("market-data-delay");
if (obj != null)
{
try {
delay = Integer.parseInt((String) obj);
} catch (Exception e) {
}
}
}
long currentTime = System.currentTimeMillis();
System.out.println("Flush?" + (currentTime - lastFlushTime) + "<" + delay);
if ((currentTime - lastFlushTime) < delay)
{
// Delaying flush. No messages will be returned at this point
FlushResult flushResult = new FlushResult();
// Don't return any messages to flush.
// And request that the next flush doesn't occur until 3 seconds since the previous.
flushResult.setNextFlushWaitTimeMillis((int)(delay -
(currentTime - lastFlushTime)));
return flushResult;
}
else // OK to flush.
{
// Flushing. All queued messages will now be returned
lastFlushTime = currentTime;
FlushResult flushResult = new FlushResult();
flushResult.setNextFlushWaitTimeMillis(delay);
flushResult.setMessages(new ArrayList(outboundQueue));
outboundQueue.clear();
return flushResult;
}
}
public FlushResult flush(MessageClient client, List outboundQueue) {
return super.flush(client, outboundQueue);
}
}
A Flex client application calls the following remote object to set the delay time between flushes on CustomDelayQueueProcessor:
package flex.samples.qos;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import flex.messaging.FlexContext;
import flex.messaging.client.FlexClient;
public class FlexClientConfigService
{
public void setAttribute(String name, Object value)
{
FlexClient flexClient = FlexContext.getFlexClient();
flexClient.setAttribute(name, value);
}
public List getAttributes()
{
FlexClient flexClient = FlexContext.getFlexClient();
List attributes = new ArrayList();
Enumeration attrNames = flexClient.getAttributeNames();
while (attrNames.hasMoreElements())
{
String attrName = (String) attrNames.nextElement();
attributes.add(new Attribute(attrName, flexClient.getAttribute(attrName)));
}
return attributes;
}
public class Attribute {
private String name;
private Object value;
public Attribute(String name, Object value) {
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
}
You register custom implementations of the FlexClientOutboundQueueProcessor class on a per-channel/endpoint basis. To register a custom implementation, you configure a flex-client-outbound-queue property in a channel definition in the services-config.xml file, as the following example shows:
<channel-definition id="per-client-qos-polling-amf" class="mx.messaging.channels.AMFChannel">
<endpoint url="http://localhost:8400/lcds-samples/messagebroker/qosamfpolling"
class="flex.messaging.endpoints.AMFEndpoint"/>
<properties>
<polling-enabled>true</polling-enabled>
<polling-interval-millis>500</polling-interval-millis>
<flex-client-outbound-queue-processor
class="flex.samples.qos.CustomDelayQueueProcessor">
<properties>
<flush-delay>5000</flush-delay>
</properties>
</flex-client-outbound-queue-processor>
</properties>
</channel-definition>
This example shows how you can also specify arbitrary properties to be passed into the initialize() method of your queue processor class after it has been constructed and has been associated with its corresponding FlexClient instance, but before any messages are enqueued. In this case, the flush-delay value is passed into the initialize() method. This is the default value that is used if a client does not specify a flush delay value.
You then specify the channel in your message destination, as the bold text in the following example shows:
<destination id="market-data-feed">
<properties>
<network>
<subscription-timeout-minutes>0</subscription-timeout-minutes>
</network>
<server>
<max-cache-size>1000</max-cache-size>
<message-time-to-live>0</message-time-to-live>
<durable>true</durable>
<allow-subtopics>true</allow-subtopics>
<subtopic-separator>.</subtopic-separator>
</server>
</properties>
<channels>
<channel ref="per-client-qos-rtmp"/>
</channels>
</destination>
Send me an e-mail when comments are added to this page | Comment Report
Current page: http://livedocs.adobe.com/livecycle/8.2/programLC/programmer/lcds/qos_3.html