Apache Storm is an open-source real-time solution for data stream processing. It accepts huge amount of data coming in extremely fast manner, can be from multiple sources, analyse it, and publish Real Time updates to some data source without storing any actual data. It is highly available for parallel execution, scalable, and fault-tolerant. It is generally used for real-time analytics, machine learning, and unbounded stream processing. Let's try to understand its basic terminology.
Apache Storm Basics
Topology : The logic for a realtime application is packaged into a Storm topology. A Storm topology is analogous to a MapReduce job. A topology is a graph of spouts and bolts that are connected with stream groupings.
Streams : A stream is an abstraction of flow of data bytes in a pipeline where data is flowing from one pipe to another. It is an unbounded sequence of tuples that is processed and created in parallel in a distributed fashion.
Spout : As the name suggests, it is the source of streams in a topology. Generally spouts read tuples (data stream) from an external source (queueing broker) like Kestrel, RabbitMQ, or Kafka. There are two types of spouts, reliable or unreliable. A reliable spout will replay the tuple if it fails while processing, whereas unreliable spout emit and forget about the tuple. A reliable spout uses ack or nack methods for its processing.
Bolts : Processing logic of streaming data is implemented in Bolts for example filtering, functions, aggregations, joins or talking to databases. One bolt output can be an input for another bolt. Tuple transformation logic is written in execute method of Bolt.
Storm Topology Graph Example |
Stream groupings : A stream grouping defines how that stream should be partitioned among the bolt's tasks. Below are few important in-build stream grouping:
- Shuffle : Tuples are spread to bolts in a random fashion in order to maintain equal load on all bolts.
- LocalOrShuffle : If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping. Sometimes LocalOrShuffle is more performant than Shuffle.
- Fields : Tuples are partitioned by the fields specified in the grouping. For example customerId field can be taken for partitioning stream to various bolts.
Apache Storm Code example
{
"id" : "18276382",
"userId" : "12345",
"tnxId" : "Pscj17293Ahe7292847Uxw"
}
{
"id" : "18276382",
"user_id" : "12345",
"tnx_id" : "Pscj17293Ahe7292847Uxw"
}
final KafkaSpoutConfig<String, String> kafkaSpoutConfig =
KafkaSpoutConfig.builder("localhost:9092", "ECC_spout_topic")
.setProp("group.id", "ECC_spout_CONSUMER")
.setProp("key.deserializer", StringDeserializer.class)
.setProp("value.deserializer", StringDeserializer.class)
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE)
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
.setMaxUncommittedOffsets(1)
.setOffsetCommitPeriodMs(2000)
.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20)
.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000)
.build();
final KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
Kafka Spout properties
- ProcessingGuarantee.AT_LEAST_ONCE : An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once)
- MaxUncommittedOffsets (max.uncommitted.offsets) : maximum number of polled offsets (records) that can be pending commit before another poll can take place. When this limit is reached, no more offsets can be polled until the next successful commit sets the number of pending offsets below the threshold.
- OffsetCommitPeriodMs (offset.commit.period.ms) : specifies the period of time (in milliseconds) after which the spout commits to Kafka
- MAX_POLL_RECORDS_CONFIG : The maximum number of records returned in a single call to poll()
- MAX_PARTITION_FETCH_BYTES_CONFIG : The maximum amount of data per-partition the server will return
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Getter
@JsonIgnoreProperties(ignoreUnknown = true)
public class EccProcessorDto {
private String id;
private Integer userId;
private String tnxId;
}
public class EccProcessorBolt extends BaseRichBolt {
private static final long serialVersionUID = 173815L;
private static final Logger LOGGER = LoggerFactory.getLogger(EccProcessorBolt.class);
private final ObjectMapper MAPPER = new ObjectMapper();
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
LOGGER.info("EccProcessorBolt prepare");
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
LOGGER.info("tuple : {}", tuple);
final String packetData = tuple.getStringByField("value");
EccProcessorDto eccProcessorDto = null;
if (!StringUtils.isEmpty(packetData)) {
try {
eccProcessorDto = MAPPER.readValue(packetData, EccProcessorDto.class);
} catch (Exception e) {
LOGGER.error("error while parsing kafka packet data : {}", packetData, e);
}
}
// Ignore kafka packet for any invalid value and do not emit
if (eccProcessorDto == null || eccProcessorDto.getId() == null
|| eccProcessorDto.getTnxId() == null || eccProcessorDto.getUserId() == null) {
this.collector.ack(tuple);
return;
}
final List<Object> output = Arrays.asList(eccProcessorDto.getId(), eccProcessorDto.getUserId(), eccProcessorDto.getTnxId());
// only emit packets are pushed to Cassandra stream
collector.emit("ECC_OUTPUT_STREAM", output);
this.collector.ack(tuple);
}
}
Guaranteeing of Message Processing
collector.emit("ECC_OUTPUT_STREAM", output);
collector.emit("ECC_OUTPUT_STREAM", tuple, output);
private static BaseCassandraBolt getBaseCassandraBolt() {
final PropertiesReader propertiesReader = new PropertiesReader();
final Map cassandraConfig = propertiesReader.readProperties("cassandra.properties");
final List<String> attributes = Arrays.asList("id", "user_id", "tnx_id");
final String queryString = "INSERT INTO ECC.ecc_table (" + StringUtils.join(attributes, ",") + ") values (?,?,?);";
final List<FieldSelector> selectors = new ArrayList<>();
for (String colName : attributes) {
selectors.add(new FieldSelector(colName));
}
CqlMapper.SelectableCqlMapper cqlMapper = new CqlMapper.SelectableCqlMapper(selectors);
final List<String> cassandraColumns = Arrays.asList("id", "user_id", "tnx_id");
final BaseCassandraBolt cassandraWriterBolt = new CassandraWriterBolt(
DynamicStatementBuilder.async(DynamicStatementBuilder.simpleQuery(queryString).with(cqlMapper)))
.withCassandraConfig(cassandraConfig)
.withResultHandler(new CustomExecutionResultHandler())
.withStreamOutputFields("ECC_OUTPUT_STREAM", new Fields(cassandraColumns));
return cassandraWriterBolt;
}
public static void main(String[] args) {
// Building topology
final TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("ECC_SPOUT_ID", kafkaSpout, 2); // parallelism_hint = 2
// NumTasks is automatically set to 2
builder
.setBolt("ECC_PROCESSOR_BOLT", new EccProcessorBolt(), 2)
.localOrShuffleGrouping("ECC_SPOUT_ID")
.setNumTasks(4);
builder
.setBolt("ECC_CASSANDRA_BOLT", getBaseCassandraBolt(), 6)
.localOrShuffleGrouping("ECC_PROCESSOR_BOLT", "ECC_OUTPUT_STREAM");
// Topology level configs
final Config conf = new Config();
conf.setNumWorkers(1);
conf.setMessageTimeoutSecs(60);
// Topology submit command
try {
StormSubmitter.submitTopology("CASSANDRA_CONSUMER_TOPOLOGY", conf, builder.createTopology());
LOGGER.info("CASSANDRA_CONSUMER_TOPOLOGY topology started");
} catch (Exception e) {
LOGGER.error("Error in CASSANDRA_CONSUMER_TOPOLOGY", e);
}
}
Parallelism in Apache Storm
- Worker processes
- Executors (threads)
- Tasks
Illustration of Worker Process, Executor and Task |
- setNumWorkers : How many worker processes to create for the topology across machines in the cluster
- parallelism_hint : initial number of executors threads for a component (spout/bolt)
- setNumTasks : How many tasks to create per component. By default, the number of tasks is set to be the same as the number of executors, i.e. Storm will run one task per thread.
No comments:
Post a Comment