question

Upvotes
Accepted
2 2 1 4

Unable to receive dynamic messages

Dear team

Client is developing API to consume data from our ERT in cloud service with Real-Time-SDK-2.0.0.L1.java. When subscribe for update messages, they receive the following message when uploading RICs and enable to subscribe for all US equity RICs. This is not happening if they only subscribe for 1 RIC.


StatusMsg

streamId="17"

domain="MarketPrice Domain"

state="Closed / Ok / None / 'Stream closed for batch'"

serviceName="ELEKTRON_DD"

StatusMsgEnd


Could you please advise:

1. How to subscribe ERT in could using Chain RIC like 0#NASCONS.0?

2. Which demo client can refer to when subscribe multiple RICs in our SDK?

3. How to unsubscribe cerntain RICs after API started?


Please find client configure and code segment below, and let me know if you need any further information.

Thank you.


configuration.xml


<?xml version="1.0" encoding="UTF-8"?>

<system>


<category name="USConfig" description="US登录">

<item name="source" value="0" description="行情源0:云行情,1:专线"></item>

<item name="username" value="xxxxxxx" description="账号"></item>

<item name="password" value="xxxxxx" description="密码"></item>

<item name="clientId" value="xxxxx" description="客户端ID"></item>

<item name="location" value="eu-west-1a" description="源站点"></item>

<item name="keyfile" value="xxxx/test_keystore.jks" description="证书"></item>

<item name="keypasswd" value="xxxxx" description="证书密码"></item>

<item name="USCode" value="0#UNIVERSE.NB" description="美股列表"></item>

<item name="refinitivHost" value="apac-1-t2.streaming-pricing-api.refinitiv.com" description="主机"></item>


</category>


<category name="Disruptor" description="Disruptor">

<item name="queue" value="65536" description="Logon"></item>

<item name="isMutilProducter" value="true" description="Logout"></item>

<item name="consumer" value="4" description="Heartbeat"></item>

</category>


<category name="staticData" description="静态数据相关配置">

<item name="exRightUrl" value="/data/" description="除权信息本地保存的路径"></item>

</category>

</system>








EmaConfig.xml

<Dictionary>

<Name value="Dictionary_1"/>


<!-- dictionaryType is optional: defaulted to ChannelDictionary" -->

<!-- possible values: ChannelDictionary, FileDictionary -->

<!-- if dictionaryType is set to ChannelDictionary, file names are ignored -->

<DictionaryType value="DictionaryType::FileDictionary"/>

<RdmFieldDictionaryFileName value="/xxxx/xxx/RDMFieldDictionary"/>

<EnumTypeDefFileName value="/xxxxx/xxx/enumtype.def"/>

</Dictionary>








ConsumerDemo.java

package com.xx.dc.service;


import com.refinitiv.ema.access.Map;

import com.refinitiv.ema.access.*;

import com.refinitiv.ema.rdm.EmaRdm;

import lombok.Data;

import org.dom4j.Document;

import org.dom4j.Element;

import org.dom4j.io.SAXReader;


import java.io.FileInputStream;

import java.io.InputStream;

import java.util.*;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.ReentrantLock;


public class ConsumerDemo {


static class Configuration {


private static java.util.Map items = new HashMap();


private static void loadConfig(String path) {

try {

InputStream is = new FileInputStream(path);

if (is != null) {

SAXReader reader = new SAXReader();


Document document = reader.read(is);

Element systemElement = document.getRootElement();

List catList = systemElement.elements("category");

for (Iterator catIter = catList.iterator(); catIter.hasNext(); ) {

Element catElement = (Element) catIter.next();

String catName = catElement.attributeValue("name");

if (catName == null|| catName.isEmpty()) {

continue;

}


List itemList = catElement.elements("item");

for (Iterator itemIter = itemList.iterator(); itemIter.hasNext(); ) {

Element itemElement = (Element) itemIter.next();

String itemName = itemElement.attributeValue("name");

String value = itemElement.attributeValue("value");

if (itemName != null && !itemName.isEmpty()) {

items.put(catName + "." + itemName, value);

}

}

}

} else {

System.out.println("file is not found!");

}

} catch (Exception ex) {

ex.printStackTrace();

}

}


public static String getString(String name) {

String value = (String) items.get(name);

return (value == null) ? "" : value;

}


public static String getString(String name, String defaultValue) {

String value = (String) items.get(name);

if (value != null && value.length() > 0)

return value;

else

return defaultValue;

}


public static int getInt(String name) {

String value = getString(name);

try {

return Integer.parseInt(value);

} catch (NumberFormatException ex) {

return 0;

}

}


public static int getInt(String name, int defaultValue) {

String value = getString(name);

try {

return Integer.parseInt(value);

} catch (NumberFormatException ex) {

}

return defaultValue;

}


public static boolean getBoolean(String name) {

String value = getString(name);

return Boolean.valueOf(value).booleanValue();

}


public static double getDouble(String name, double defaultValue) {

String value = getString(name);

try {

return Double.parseDouble(value);

} catch (NumberFormatException ex) {

}

return defaultValue;

}


public static int parseInt(String name) {

String value = (String) items.get(name);

return (value == null || "".equals(value.trim())) ? -1 : Integer.parseInt(value);

}


public static java.util.Map getItems() {

return items;

}

}


@Data

class LoginBean {


public LoginBean() {

this.setUserName(Configuration.getString("USConfig.username"));

this.setPassword(Configuration.getString("USConfig.password"));

this.setClientId(Configuration.getString("USConfig.clientId"));

this.setLocation(Configuration.getString("USConfig.location"));


this.setProxyHostName(Configuration.getString("USConfig.proxyHostName"));

this.setProxyPort(Configuration.getString("USConfig.proxyPort"));

this.setProxyUserName(Configuration.getString("USConfig.proxyUserName"));

this.setProxyPassword(Configuration.getString("USConfig.proxyPassword"));

this.setHost(Configuration.getString("USConfig.refinitivHost"));

this.setPort("14002");

}


private String userName;

private String password;

private String clientId;

private String proxyHostName;

private String proxyPort = "-1";

private String proxyUserName;

private String proxyPassword;

private String proxyDomain;

private String proxyKrb5Configfile;

private String host;

private String port;

private String location = "us-east";

}


class StaticDataCallback implements OmmConsumerClient, ServiceEndpointDiscoveryClient {


public final Set<Integer> fids = new HashSet<>();

public static final int nextPageFId = 815;

{

fids.add(800);

fids.add(801);

fids.add(802);

fids.add(803);

fids.add(804);

fids.add(805);

fids.add(806);

fids.add(807);

fids.add(808);

fids.add(809);

/*fids.add(809);

fids.add(809);

fids.add(809);

fids.add(809);*/

fids.add(810);

fids.add(811);

fids.add(812);

fids.add(813);

fids.add(815);

}


@Override

public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent consumerEvent) {

System.out.println(refreshMsg.name());


if (0 != refreshMsg.state().statusCode()) {

// LogFactory.getOptionLogger().logInfo(refreshMsg.toString());

}


if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()) {

for (FieldEntry fieldEntry : refreshMsg.payload().fieldList()) {


if (!fids.contains(fieldEntry.fieldId())) {

continue;

}

String value = fieldEntry.load().toString();

if (nextPageFId == fieldEntry.fieldId()) {

if (US_EMPTY.equalsIgnoreCase(value)) {

staticDataListNextPage = US_EMPTY;

} else {

staticDataListNextPage = fieldEntry.ascii().toString();

}

break;

}

if (US_EMPTY.equalsIgnoreCase(value)) {

continue;

}

String code = fieldEntry.ascii().toString();

//if (!USDataCenter.getInstance().containsKey(code)) {

ricCodes.putIfAbsent(code,Boolean.FALSE);

//}

}

}


lock.lock();

try {

condition.signalAll();

} finally {

lock.unlock();

}


}


@Override

public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) {

System.out.println(updateMsg.name());


}


@Override

public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent consumerEvent) {

System.out.println(statusMsg.name());


}


@Override

public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent) {


}


@Override

public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent) {


}


@Override

public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent) {


}


@Override

public void onSuccess(ServiceEndpointDiscoveryResp serviceEndpointResp, ServiceEndpointDiscoveryEvent event) {


}


@Override

public void onError(String errorText, ServiceEndpointDiscoveryEvent event) {


}

}


class DynamicDataCallback implements OmmConsumerClient, ServiceEndpointDiscoveryClient {


@Override

public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent consumerEvent) {

System.out.println("refreshMsg:" + refreshMsg.name());

}


@Override

public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) {

System.out.println("updateMsg:" + updateMsg.name());


}


@Override

public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent consumerEvent) {

System.out.println("statusMsg:" + statusMsg.toString());


}


@Override

public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent) {


}


@Override

public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent) {


}


@Override

public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent) {


}


@Override

public void onSuccess(ServiceEndpointDiscoveryResp serviceEndpointResp, ServiceEndpointDiscoveryEvent event) {


}


@Override

public void onError(String errorText, ServiceEndpointDiscoveryEvent event) {


}

}


private List<Long> staticHandles = new ArrayList<>();

StaticDataCallback staticDataCallback = new StaticDataCallback();

DynamicDataCallback dynamicDataCallback = new DynamicDataCallback();

ServiceEndpointDiscovery serviceDiscovery = EmaFactory.createServiceEndpointDiscovery();

private Map configDb = EmaFactory.createMap();


OmmConsumerConfig ommConsumerConfig;

private String staticDataListNextPage;

LoginBean loginBean;

private OmmConsumer consumer;


private ReentrantLock lock = new ReentrantLock();

private Condition condition = lock.newCondition();


private ConcurrentHashMap<String,Boolean> ricCodes = new ConcurrentHashMap<>();

public static final String US_EMPTY = "(blank data)";

private String confPath;

private String emaConfPath;

private static String DEFAULT_USER_CONFIG_FILE_NAME = "D:\\work\\gitRepo\\sis-gw\\DataCollectionUSAggregator\\conf\\DataCollectionUS\\configuration.xml";

private static String DEFAULT_EMA_CONFIG_FILE_NAME = "D:\\work\\gitRepo\\sis-gw\\DataCollectionUSAggregator\\conf\\DataCollectionUS\\configuration.xml";



public ConsumerDemo(String userConfPath, String emaConfPath) {

if (userConfPath == null || userConfPath.isEmpty())

userConfPath = DEFAULT_USER_CONFIG_FILE_NAME;


if (emaConfPath == null || emaConfPath.isEmpty())

emaConfPath = DEFAULT_EMA_CONFIG_FILE_NAME;


this.confPath = userConfPath;

this.emaConfPath = emaConfPath;

Configuration.loadConfig(userConfPath);

this.staticDataListNextPage = Configuration.getString("USConfig.USCode");

this.ommConsumerConfig = EmaFactory.createOmmConsumerConfig(emaConfPath);

this.loginBean = new LoginBean();

}


public static void main(String[] args) {



ConsumerDemo demo = createDemo(args);


demo.createConsumer();


if (demo.consumer == null) {

System.out.println("consumer is null.");

return;

}


while (true) {

if (US_EMPTY.equalsIgnoreCase(demo.staticDataListNextPage)) {

break;

}

demo.lock.lock();

try {

demo.staticHandles.add(demo.consumer

.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD")

.name(demo.staticDataListNextPage), demo.staticDataCallback));

demo.condition.await();

} catch (Exception e) {

e.printStackTrace();

} finally {

demo.lock.unlock();

}

}


try {

if (!demo.staticHandles.isEmpty()) {

demo.unregister(demo.staticHandles);

}

} catch (Exception e) {

e.printStackTrace();

}



int stocksSize = demo.ricCodes.size();

if (stocksSize <= 0) {

System.out.println("code size is 0!");

return;

}


ElementList batch = EmaFactory.createElementList();

OmmArray array = EmaFactory.createOmmArray();

// subscribe all

int num = stocksSize;


Iterator<String> codeIterator = demo.ricCodes.keySet().iterator();

while (codeIterator.hasNext()) {

try {

array.add(EmaFactory.createOmmArrayEntry().ascii(codeIterator.next()));

num--;

if (num % 1000 == 0) {

batch.add(EmaFactory.createElementEntry().array(EmaRdm.ENAME_BATCH_ITEM_LIST, array));

demo.consumer.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").payload(batch), demo.dynamicDataCallback);

array.clear();

batch.clear();

}

} catch (IllegalAccessError e) {

e.printStackTrace();

}

}

if (!array.isEmpty()) {

batch.add(EmaFactory.createElementEntry().array(EmaRdm.ENAME_BATCH_ITEM_LIST, array));

demo.consumer.registerClient(EmaFactory.createReqMsg().serviceName("ELEKTRON_DD").payload(batch), demo.dynamicDataCallback);

}


try {

Thread.sleep(1000000000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}


static ConsumerDemo createDemo(String[] args)

{

try

{

int argsCount = 0;

String userConfPath = null;

String emaConfPath = null;

while (argsCount < args.length)

{


if ("-userConfPath".equals(args[argsCount]))

{

userConfPath = argsCount < (args.length-1) ? args[++argsCount] : null;

++argsCount;

}

else if ("-emaConfPath".equals(args[argsCount]))

{

emaConfPath = argsCount < (args.length-1) ? args[++argsCount] : null;

++argsCount;

}


}


return new ConsumerDemo(userConfPath, emaConfPath);


}

catch (Exception e) {

e.printStackTrace();

}


return null;

}



public boolean createConsumer() {

boolean success = false;

try {

if (consumer == null) {

createProgramaticConfig(configDb);

if ((loginBean.getProxyHostName() == null) || (loginBean.getProxyHostName().isEmpty()) || (loginBean.getProxyPort() == "-1"))

{

consumer = EmaFactory.createOmmConsumer(ommConsumerConfig.consumerName("Consumer_1")

.username(loginBean.getUserName()).password(loginBean.getPassword())

.clientId(loginBean.getClientId())

.config(configDb));

}

else

{

consumer = EmaFactory.createOmmConsumer(ommConsumerConfig.consumerName("Consumer_1")

.username(loginBean.getUserName()).password(loginBean.getPassword())

.clientId(loginBean.getClientId()).config(configDb).tunnelingProxyHostName(loginBean.getProxyHostName())

.tunnelingProxyPort(loginBean.getProxyPort())

.tunnelingCredentialUserName(loginBean.getProxyUserName())

.tunnelingCredentialPasswd(loginBean.getProxyPassword())

.tunnelingCredentialDomain(null)

.tunnelingCredentialKRB5ConfigFile(null));

}

}


success = true;

} catch (Exception e) {

e.printStackTrace();

}

return success;

}


private void createProgramaticConfig(Map configDb)

{

Map elementMap = EmaFactory.createMap();

ElementList elementList = EmaFactory.createElementList();

ElementList innerElementList = EmaFactory.createElementList();


innerElementList.add(EmaFactory.createElementEntry().ascii("Channel", "Channel_1"));


elementMap.add(EmaFactory.createMapEntry().keyAscii("Consumer_1", MapEntry.MapAction.ADD, innerElementList));

innerElementList.clear();


elementList.add(EmaFactory.createElementEntry().map("ConsumerList", elementMap));

elementMap.clear();


configDb.add(EmaFactory.createMapEntry().keyAscii("ConsumerGroup", MapEntry.MapAction.ADD, elementList));

elementList.clear();


innerElementList.add(EmaFactory.createElementEntry().ascii("ChannelType", "ChannelType::RSSL_ENCRYPTED"));

innerElementList.add(EmaFactory.createElementEntry().ascii("Host", loginBean.getHost()));

innerElementList.add(EmaFactory.createElementEntry().ascii("Port", loginBean.getPort()));

innerElementList.add(EmaFactory.createElementEntry().intValue("EnableSessionManagement", 1));


elementMap.add(EmaFactory.createMapEntry().keyAscii("Channel_1", MapEntry.MapAction.ADD, innerElementList));

innerElementList.clear();


elementList.add(EmaFactory.createElementEntry().map("ChannelList", elementMap));

elementMap.clear();


configDb.add(EmaFactory.createMapEntry().keyAscii("ChannelGroup", MapEntry.MapAction.ADD, elementList));

}


public void unregister(List<Long> handles) {

for (Long handle : handles) {

if (consumer != null) {

consumer.unregister(handle);

}

}

}

}


elektronrefinitiv-realtimeelektron-sdkrrtema-apielektron-message-apichain-ric
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Upvote
Accepted
11.5k 16 7 10

Hello @Xiaorong.Xu

You and the client can download the "Message API Java HTML Documentation Set (zip file)" which includes the Reference Guide document and all other EMA Java documents on the EMA Java Document page.

The EMA API and the server have their internal heartbeat message which they use to monitor the connection. The application cannot access this heartbeat message but the application can detect the disconnection in various ways. Please see more detail in the following posts:


ahs-doc-download.png (115.7 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Upvote
11.5k 16 7 10

Hello @Xiaorong.Xu

Please be informed that the Status message "Closed / Ok / None / 'Stream closed for batch'" is expected behavior when the application sends the Batch request message to Refinitiv Real-Time. Please see more explanation in this Why `Stream closed for batch` happens? post.

Question 1: How to subscribe ERT in could use Chain RIC like 0#NASCONS.0?

Answer:

Is the 0#NASCONS.0 valid? I have tried it and it returns '**The record could not be found' which means the item does not exist or invalid name. However, the client can find the explanation of how to subscribe Chain (and its sub-chain) data in the following resources:

Question 2: Which demo client can refer to when subscribing multiple RICs in our SDK?

Answer: The client can refer to EMA Java ex370_MP_Batch example. The other useful resource is EMA Java Batch and View features article.

Question 3: How to unsubscribe certain RICs after API started?

Answer: The application can call OmmConsumer.unregister(<item handle>) function to close the item request. Please see an example code in EMA Java ex300_MP_Close example.


omm-unregister.png (10.6 KiB)
icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Upvotes
2 2 1 4

Thank you very much Wasin. Do we have any document on what possible error message client could receive if like batch registering failed or disconnected and so on? So client can be prepared if any outage.

In addition is there any guide on how to design auto reconnection on client API please?

Thank you

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Upvote
11.5k 16 7 10

Hello @Xiaorong.Xu

Unfortunately, we do not have those kinds of documents (list of error messages and auto reconnection).

Please be informed that most error messages are generated from the infrastructure side, from the Exchange, or from the Feed. The application should check the item or login stream health from the StatusMsg.OmmState object via the onStatusMsg() callback function.

  • OmmState.DataState: represents item data state.
  • OmmState.StreamState: represents item stream state.
  • OmmState.StatusCode: represents status code.
  • OmmState.statusText(): get status text information

The client can find more detail about OmmState, DataState, StreamState, and StatusCode objects in EMA Java Reference Guide document (<RTSDK Java package>\Java\Ema\Docs\refman folder).

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Thank you Wasin. The latest SDK Real-Time-SDK-2.0.0.L1.java.zip does not contain the document mentioned. Could you please provide the link where we can download it?

Could you please also confirm if we have any heartbeat message in the feed that client can monitor and trigger alert with? Thank you.

Upvotes
2 2 1 4

@wasin.waeosri Thank you very much! Problem resolved!

icon clock
10 |1500

Up to 2 attachments (including images) can be used with a maximum of 5.0 MiB each and 10.0 MiB total.

Click below to post an Idea Post Idea