Former-commit-id:a02aeb236c
[formerly9f19e3f712
] [formerlya02aeb236c
[formerly9f19e3f712
] [formerly06a8b51d6d
[formerly 64fa9254b946eae7e61bbc3f513b7c3696c4f54f]]] Former-commit-id:06a8b51d6d
Former-commit-id:8e80217e59
[formerly3360eb6c5f
] Former-commit-id:377dcd10b9
271 lines
8.1 KiB
C++
271 lines
8.1 KiB
C++
/*
|
|
*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*
|
|
*/
|
|
|
|
/**
|
|
* topic_publisher.cpp:
|
|
*
|
|
* This program is one of three programs designed to be used
|
|
* together. These programs implement a publish-subscribe example
|
|
* using the "amq.topic" exchange. In the example multiple listeners
|
|
* can subscribe to the same queues for TTL messages.
|
|
* The TTL messages are all ticker price data. Messages are
|
|
* browsed and therefore shared among the multiple listeners.
|
|
* Messages timeout using TTL so that they don't stay in the queue
|
|
* for too long and fill it up.
|
|
* Local exclusive LVQ are also declared for market data.
|
|
*
|
|
* declare_queues.cpp
|
|
*
|
|
* Declares several non-exclusive queues bound to the amq:topic exchange
|
|
*
|
|
* topic_publisher.cpp
|
|
*
|
|
* Sends messages to the "amq.topic" exchange, using the
|
|
* multipart routing keys for ticker price and market data
|
|
* Ticker messages are sent using a TTL value.
|
|
*
|
|
* topic_listener.cpp (this program)
|
|
*
|
|
* Subscribes to non-exclusive queues in NOT_ACQUIRE mode for
|
|
* ticker price data and declares two LVQs for market data.
|
|
*
|
|
* Multiple listeners can be run at the same time.
|
|
*
|
|
*/
|
|
|
|
|
|
#include <qpid/client/Connection.h>
|
|
#include <qpid/client/Session.h>
|
|
#include <qpid/client/AsyncSession.h>
|
|
#include <qpid/client/Message.h>
|
|
#include "qpid/client/QueueOptions.h"
|
|
|
|
|
|
#include <stdlib.h>
|
|
#include <cstdlib>
|
|
#include <iostream>
|
|
#include <set>
|
|
#include <sstream>
|
|
|
|
using namespace qpid::client;
|
|
using namespace qpid::framing;
|
|
|
|
using std::stringstream;
|
|
using std::string;
|
|
|
|
class Publisher {
|
|
private:
|
|
Session& session;
|
|
int ttl_time;
|
|
unsigned long seq;
|
|
|
|
unsigned short high_[6];
|
|
unsigned short low_[6];
|
|
unsigned long shares_[6];
|
|
unsigned long volume_[6];
|
|
QueueOptions args;
|
|
|
|
public:
|
|
Publisher( Session& session,
|
|
const int ttl_time,
|
|
const unsigned long shares[6]);
|
|
|
|
virtual void publish_ticker(const std::string queue, unsigned short& curr_price);
|
|
virtual void publish_market(const std::string queue, unsigned short& curr_price, int i);
|
|
virtual ~Publisher() { };
|
|
};
|
|
|
|
Publisher::Publisher(Session& session, int ttl_time, const unsigned long shares[6]) :
|
|
session(session),
|
|
ttl_time(ttl_time),
|
|
seq(0)
|
|
{
|
|
for (unsigned short i=0; i < 6; i++) {
|
|
high_[i] = 0;
|
|
low_[i] = 9999;
|
|
volume_[i] = 0;
|
|
shares_[i] = shares[i];
|
|
}
|
|
}
|
|
|
|
|
|
void Publisher::publish_ticker(const std::string symbol, unsigned short& curr_price)
|
|
{
|
|
Message message;
|
|
|
|
// Set the routing key once, we'll use the same routing key for all
|
|
// messages.
|
|
|
|
std::string routing_key = "TICKER." + symbol;
|
|
std::cout << "Setting routing key:" << routing_key << std::endl;
|
|
message.getDeliveryProperties().setRoutingKey(routing_key);
|
|
|
|
// Randomally generate some price flucuations
|
|
bool mvmnt;
|
|
unsigned short change = rand() % 3;
|
|
if (rand() % 2 == 0)
|
|
{
|
|
mvmnt = true;
|
|
curr_price += change;
|
|
}
|
|
else
|
|
{
|
|
mvmnt = false;
|
|
curr_price = (curr_price - change)>0 ? (curr_price - change) : 0;
|
|
}
|
|
|
|
// Was there change in price or no change ?
|
|
std::string movement;
|
|
if (!change)
|
|
{
|
|
movement = "] [--]";
|
|
} else
|
|
{
|
|
movement = (mvmnt ? "] [UP]" : "] [DOWN]");
|
|
}
|
|
|
|
stringstream ticker_data;
|
|
// Build up the ticker info
|
|
ticker_data << "[TICKER] " << "Symbol:" << symbol << " \tPrice[" << curr_price << "] \t["
|
|
<< change << movement;
|
|
|
|
message.setData(ticker_data.str());
|
|
// Set TTL value so that message will timeout after a period and be purged from queues
|
|
message.getDeliveryProperties().setTtl(ttl_time);
|
|
// Asynchronous transfer sends messages as quickly as
|
|
// possible without waiting for confirmation.
|
|
async(session).messageTransfer(arg::content=message, arg::destination="amq.topic");
|
|
|
|
}
|
|
|
|
void Publisher::publish_market(const std::string symbol, unsigned short& curr_price, int i)
|
|
{
|
|
Message message;
|
|
|
|
// Set the routing key
|
|
std::string routing_key = "MRKT." + symbol;
|
|
std::cout << "Setting routing key:" << routing_key << std::endl;
|
|
message.getDeliveryProperties().setRoutingKey(routing_key);
|
|
|
|
// Calculate the market data low/hi change, vol, market cap etc.
|
|
if (curr_price < low_[i] || low_[i] == 0)
|
|
{
|
|
low_[i] = curr_price;
|
|
}
|
|
else if (curr_price > high_[i] || high_[i] == 9999)
|
|
{
|
|
high_[i] = curr_price;
|
|
}
|
|
|
|
volume_[i] += rand() % 1000; // increase the daily volume tracker
|
|
int mkt_cap = shares_[i] * curr_price; // calculate new market cap based on current price
|
|
|
|
stringstream market_data;
|
|
// Build up the ticker info
|
|
market_data << "[MARKET] " << "Symbol:" << symbol << "\tVolume: " << volume_[i]
|
|
<< "\tHi:" << high_[i] << "\tLo:" << low_[i] << "\tMktCap:"
|
|
<< mkt_cap <<"M\tSEQ[" << seq << "]";
|
|
|
|
message.setData(market_data.str());
|
|
|
|
std::string key;
|
|
args.getLVQKey(key);
|
|
message.getHeaders().setString(key, symbol);
|
|
|
|
// Asynchronous transfer sends messages as quickly as
|
|
// possible without waiting for confirmation.
|
|
async(session).messageTransfer(arg::content=message, arg::destination="amq.topic");
|
|
seq++; // This sequence number is really just to demonstrate the LVQ nature of the queue.
|
|
// You will notice some messages don't show because they are overwritten by last value.
|
|
|
|
}
|
|
|
|
|
|
int main(int argc, char** argv) {
|
|
unsigned int pub_cycles = argc>1 ? atoi(argv[1]) : 100;
|
|
unsigned int ttl_time = argc>2 ? atoi(argv[2]) : 4000;
|
|
const char* host = argc>3 ? argv[3] : "127.0.0.1";
|
|
int port = argc>4 ? atoi(argv[4]) : 5672;
|
|
std::cout <<"Usage: topic_publisher <pub cycles> <TTL-timeout> <host name/IP> <port>" << std::endl;
|
|
std::cout <<"\tparameters are optional but must be in this order when used." << std::endl;
|
|
|
|
// Set up the stocks symbols and their prices
|
|
std::string symbol[6];
|
|
unsigned short price[6];
|
|
symbol[0] = "NYSE.RHT"; // Red Hat
|
|
symbol[1] = "NYSE.IBM"; // IBM Corp.
|
|
symbol[2] = "NASDAQ.MSFT"; // Microsoft
|
|
symbol[3] = "NASDAQ.CSCO"; // Cisco Systems
|
|
symbol[4] = "NASDAQ.YHOO"; // Yahoo
|
|
symbol[5] = "NASDAQ.GOOG"; // Google
|
|
|
|
// Rough starting values.
|
|
price[0] = rand() % 30 +1;
|
|
price[1] = rand() % 120 +1;
|
|
price[2] = rand() % 20 +1;
|
|
price[3] = rand() % 75 +1;
|
|
price[4] = rand() % 10 +1;
|
|
price[5] = rand() % 323 +1;
|
|
|
|
// Shares oustanding in millions.
|
|
unsigned long shares[6] = {190,1340,8890, 5860, 1390, 314};
|
|
|
|
|
|
Connection connection;
|
|
try {
|
|
connection.open(host, port);
|
|
Session session = connection.newSession();
|
|
|
|
Publisher theFeed(session,ttl_time, shares);
|
|
|
|
//--------- Main body of program --------------------------------------------
|
|
|
|
// Print the opening values for each symbol
|
|
std::cout << std::endl << "Opening values:" << std::endl;
|
|
for (int i=0; i < 6; i++)
|
|
{
|
|
std::cout << symbol[i] << ":" << price[i] << std::endl;
|
|
}
|
|
|
|
// For the duration of the publishing cycles publish
|
|
// ticker and market data for each symbol
|
|
for (unsigned int j=0; j<pub_cycles; j++)
|
|
{
|
|
for (unsigned int i=0; i < 6; i++)
|
|
{
|
|
// for each symbol publish the ticker and the market data
|
|
theFeed.publish_ticker(symbol[i], price[i]);
|
|
theFeed.publish_market(symbol[i], price[i], i);
|
|
}
|
|
}
|
|
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
connection.close();
|
|
return 0;
|
|
} catch(const std::exception& error) {
|
|
std::cout << error.what() << std::endl;
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
|