Storm-源码分析-Topology Submit-Worker

1 mk-worker

和其他的daemon一样, 都是通过defserverfn macro来创建worker

(defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]
  (log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id
               " and conf " conf)
  (if-not (local-mode? conf)
    (redirect-stdio-to-slf4j!))
  ;; because in local mode, its not a separate
  ;; process. supervisor will register it in this case
  (when (= :distributed (cluster-mode conf))
    (touch (worker-pid-path conf worker-id (process-pid))))
  (let [worker (worker-data conf shared-mq-context storm-id assignment-id port worker-id) ;;1.1 生成work-data
        ;;1.2 生成worker的hb
        heartbeat-fn #(do-heartbeat worker)
        ;; do this here so that the worker process dies if this fails
        ;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on)
        _ (heartbeat-fn)
        
        ;; heartbeat immediately to nimbus so that it knows that the worker has been started
        _ (do-executor-heartbeats worker)        
        
        executors (atom nil)
        ;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
        ;; to the supervisor
        _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
        _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))

        ;;1.3 更新发送connections
        refresh-connections (mk-refresh-connections worker)
        _ (refresh-connections nil)
        _ (refresh-storm-active worker nil)
                
        ;;1.4 创建executors  
        _ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e)))
        
        ;;1.5 launch接收线程,将数据从server的侦听端口, 不停的放到task对应的接收队列
         receive-thread-shutdown (launch-receive-thread worker) ;;返回值是thread的close function
        
        ;;1.6 定义event handler来处理transfer queue里面的数据, 并创建transfer-thread
        transfer-tuples (mk-transfer-tuples-handler worker)     
        transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)  
               
        ;;1.7 定义worker shutdown函数, 以及worker的操作接口实现                      
        shutdown* (fn []
                    (log-message "Shutting down worker " storm-id " " assignment-id " " port)
                    (doseq [[_ socket] @(:cached-node+port->socket worker)]
                      ;; this will do best effort flushing since the linger period
                      ;; was set on creation
                      (.close socket))
                    (log-message "Shutting down receive thread")
                    (receive-thread-shutdown)
                    (log-message "Shut down receive thread")
                    (log-message "Terminating messaging context")
                    (log-message "Shutting down executors")
                    (doseq [executor @executors] (.shutdown executor))
                    (log-message "Shut down executors")
                                        
                    ;;this is fine because the only time this is shared is when it's a local context,
                    ;;in which case it's a noop
                    (.term ^IContext (:mq-context worker))
                    (log-message "Shutting down transfer thread")
                    (disruptor/halt-with-interrupt! (:transfer-queue worker))

                    (.interrupt transfer-thread)
                    (.join transfer-thread)
                    (log-message "Shut down transfer thread")
                    (cancel-timer (:heartbeat-timer worker))
                    (cancel-timer (:refresh-connections-timer worker))
                    (cancel-timer (:refresh-active-timer worker))
                    (cancel-timer (:executor-heartbeat-timer worker))
                    (cancel-timer (:user-timer worker))
                    
                    (close-resources worker)
                    
                    ;; TODO: here need to invoke the "shutdown" method of WorkerHook
                    
                    (.remove-worker-heartbeat! (:storm-cluster-state worker) storm-id assignment-id port)
                    (log-message "Disconnecting from storm cluster state context")
                    (.disconnect (:storm-cluster-state worker))
                    (.close (:cluster-state worker))
                    (log-message "Shut down worker " storm-id " " assignment-id " " port))
        ret (reify
             Shutdownable
             (shutdown
              [this]
              (shutdown*))
             DaemonCommon
             (waiting? [this]
               (and
                 (timer-waiting? (:heartbeat-timer worker))
                 (timer-waiting? (:refresh-connections-timer worker))
                 (timer-waiting? (:refresh-active-timer worker))
                 (timer-waiting? (:executor-heartbeat-timer worker))
                 (timer-waiting? (:user-timer worker))
                 ))
             )]
    
    (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections)
    (schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))

    (log-message "Worker has topology config " (:storm-conf worker))
    (log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading")
    ret
    ))

 

1.1 生成worker-data

(defn worker-data [conf mq-context storm-id assignment-id port worker-id]
  (let [cluster-state (cluster/mk-distributed-cluster-state conf)
        storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
        storm-conf (read-supervisor-storm-conf conf storm-id)
        ;;从assignments里面找出分配给这个worker的executors, 另外加上个SYSTEM_EXECUTOR
        executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
        ;;基于disruptor创建worker用于接收和发送messgae的buffer queue
        ;;创建基于disruptor的transfer-queue
        transfer-queue (disruptor/disruptor-queue (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
                                                  :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
        ;;对于每个executors创建receive-queue(基于disruptor-queue),并生成{e,queue}的map返回
         executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
        ;;executor可能有多个tasks,相同executor的tasks公用一个queue, 将{e,queue}转化为{t,queue}
        receive-queue-map (->> executor-receive-queue-map
                               (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
                               (into {}))
        ;;读取supervisor机器上存储的stormcode.ser (topology对象的序列化文件)
        topology (read-supervisor-topology conf storm-id)]  
     ;;recursive-map,会将底下value都执行一遍, 用返回值和key生成新的map
     (recursive-map
      :conf conf
      :mq-context (if mq-context
                      mq-context
                      (TransportFactory/makeContext storm-conf)) ;;已经prepare的具有IContext接口的对象
       :storm-id storm-id
      :assignment-id assignment-id
      :port port
      :worker-id worker-id
      :cluster-state cluster-state
      :storm-cluster-state storm-cluster-state
      :storm-active-atom (atom false)
      :executors executors
      :task-ids (->> receive-queue-map keys (map int) sort)
      :storm-conf storm-conf
      :topology topology
      :system-topology (system-topology! storm-conf topology)
      :heartbeat-timer (mk-halting-timer)
      :refresh-connections-timer (mk-halting-timer)
      :refresh-active-timer (mk-halting-timer)
      :executor-heartbeat-timer (mk-halting-timer)
      :user-timer (mk-halting-timer)
      :task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
      :component->stream->fields (component->stream->fields (:system-topology <>)) ;;从ComponentCommon中读出steams的fields信息
       :component->sorted-tasks (->> (:task->component <>) reverse-map (map-val sort))
      :endpoint-socket-lock (mk-rw-lock)
      :cached-node+port->socket (atom {})
      :cached-task->node+port (atom {})
      :transfer-queue transfer-queue
      :executor-receive-queue-map executor-receive-queue-map
      :short-executor-receive-queue-map (map-key first executor-receive-queue-map) ;;单纯为了简化executor的表示, 由[first-task,last-task]变为first-task
      :task->short-executor (->> executors         ;;列出task和简化后的short-executor的对应关系
                                 (mapcat (fn [e] (for [t (executor-id->tasks e)] [t (first e)])))
                                 (into {})
                                 (HashMap.))
      :suicide-fn (mk-suicide-fn conf)
      :uptime (uptime-computer)
      :default-shared-resources (mk-default-resources <>)
      :user-shared-resources (mk-user-resources <>)
      :transfer-local-fn (mk-transfer-local-fn <>) ;;接收messages并发到task对应的接收队列
       :transfer-fn (mk-transfer-fn <>) ;;将处理过的message放到发送队列transfer-queue
      )))

1.2 Worker Heartbeat

1.2.1. 建立worker本地的hb
调用do-heartbeat, 将worker的hb写到本地的localState数据库中, (.put state LS-WORKER-HEARTBEAT hb false)

1.2.2. 将worker hb同步到zk, 以便nimbus可以立刻知道worker已经启动
调用do-executor-heartbeats, 通过worker-heartbeat!将worker hb写入zk的workerbeats目录

1.2.3. 设定timer定期更新本地hb和zk hb

(schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
(schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors))

 

1.3 维护和更新worker的发送connection

mk-refresh-connections定义并返回一个匿名函数, 但是这个匿名函数, 定义了函数名this, 这个情况前面也看到, 是因为这个函数本身要在函数体内被使用.
并且refresh-connections是需要反复被执行的, 即当每次assignment-info发生变化的时候, 就需要refresh一次
所以这里使用timer.schedule-recurring就不合适, 因为不是以时间触发
这里使用的是zk的callback触发机制

Supervisor的mk-synchronize-supervisor, 以及worker的mk-refresh-connections, 都采用类似的机制
a. 首先需要在每次assignment改变的时候被触发, 所以都利用zk的watcher
b. 都需要将自己作为callback, 并在获取assignment时进行注册, 都使用(fn this [])
c. 因为比较耗时, 都选择后台执行callback, 但是mk-synchronize-supervisor使用的是eventmanager, mk-refresh-connections使用的是timer
两者不同, timer是基于优先级队列, 所以更灵活, 可以设置延时时间, 而eventmanager, 就是普通队列实现, FIFO
另外, eventmanager利用reify来封装接口, 返回的是record, 比timer的实现要优雅些

首先, 如果没有指定callback, 以(schedule (:refresh-connections-timer worker) 0 this)为callback
接着, (.assignment-info storm-cluster-state storm-id callback) 在获取assignment信息的时候, 设置callback, 也就是说当assignment发生变化时, 就会向refresh-connections-timer中发送一个'立即执行this’的event
这样就可以保证, 每次assignment发生变化, timer都会在后台做refresh-connections的操作

(defn mk-refresh-connections [worker]
  (let [outbound-tasks (worker-outbound-tasks worker) ;;a.找出该woker需要向哪些component tasks发送数据,to-tasks
        conf (:conf worker)
       storm-cluster-state (:storm-cluster-state worker)
       storm-id (:storm-id worker)]
    (fn this
      ([]
        (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this)))) ;;schedule往timer里面加event
      ([callback]
        (let [assignment (.assignment-info storm-cluster-state storm-id callback)
              my-assignment (-> assignment   ;;b.得到to-tasks的node+port
                                :executor->node+port
                                to-task->node+port
                                (select-keys outbound-tasks)
                                (#(map-val endpoint->string %)))
              ;; we dont need a connection for the local tasks anymore
              needed-assignment (->> my-assignment  ;;c.排除local tasks
                                      (filter-key (complement (-> worker :task-ids set))))
              needed-connections (-> needed-assignment vals set)
              needed-tasks (-> needed-assignment keys)
              
              current-connections (set (keys @(:cached-node+port->socket worker)))
              new-connections (set/difference needed-connections current-connections) ;;d.需要add的和remove的connections
              remove-connections (set/difference current-connections needed-connections)]
              (swap! (:cached-node+port->socket worker) ;;e.创建新的connections
                     #(HashMap. (merge (into {} %1) %2))
                     (into {}
                       (dofor [endpoint-str new-connections
                               :let [[node port] (string->endpoint endpoint-str)]]
                         [endpoint-str
                          (.connect
                           ^IContext (:mq-context worker)
                           storm-id
                           ((:node->host assignment) node)
                           port)
                          ]
                         )))
              (write-locked (:endpoint-socket-lock worker)
                (reset! (:cached-task->node+port worker)
                        (HashMap. my-assignment)))
              (doseq [endpoint remove-connections]
                (.close (get @(:cached-node+port->socket worker) endpoint)))
              (apply swap!
                     (:cached-node+port->socket worker)
                     #(HashMap. (apply dissoc (into {} %1) %&))
                     remove-connections)
              
              (let [missing-tasks (->> needed-tasks
                                       (filter (complement my-assignment)))]
                (when-not (empty? missing-tasks)
                  (log-warn "Missing assignment for following tasks: " (pr-str missing-tasks))
                  )))))))

refresh-connections的步骤

a. 找出该worker下需要往其他task发送数据的task, outbound-tasks

    worker-outbound-tasks, 找出当前work中的task属于的component, 并找出该component的目标component

    最终找出目标compoennt所对应的所有task, 作为返回   

b. 找出outbound-tasks对应的tasks->node+port, my-assignment

c. 如果outbound-tasks在同一个worker进程中, 不需要建connection, 所以排除掉, 剩下needed-assignment 

   :value –> needed-connections , :key –> needed-tasks

d. 和当前已经创建并cache的connection集合对比一下, 找出new-connections和remove-connections

e. 调用Icontext.connect, (.connect ^IContext (:mq-context worker) storm-id ((:node->host assignment) node) port), 创建新的connection, 并merge到:cached-node+port->socket中

f. 使用my-assignment更新:cached-task->node+port (结合:cached-node+port->socket, 就可以得到task->socket) 

g. close所有remove-connections, 并从:cached-node+port->socket中删除

 

1.4 创建worker中的executors

executor/mk-executor worker e, Storm-源码分析-Topology Submit-Executor

 

1.5 launch-receive-thread

launch接收线程,将数据从server的侦听端口, 不停的放到task对应的接收队列

(defn launch-receive-thread [worker]
  (log-message "Launching receive-thread for " (:assignment-id worker) ":" (:port worker))
  (msg-loader/launch-receive-thread!
    (:mq-context worker)
    (:storm-id worker)
    (:port worker)
    (:transfer-local-fn worker)
    (-> worker :storm-conf (get TOPOLOGY-RECEIVER-BUFFER-SIZE))
    :kill-fn (fn [t] (halt-process! 11))))

1.5.1 mq-context
调用TransportFactory/makeContext来创建context对象, 根据配置不同, 分别创建local或ZMQ的context

1.5.2 transfer-local-fn
返回fn, 该fn会将tuple-batch里面的tuples, 按task所对应的executor发送到对应的接收队列

(defn mk-transfer-local-fn [worker]
  (let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
        task->short-executor (:task->short-executor worker)
        task-getter (comp #(get task->short-executor %) fast-first)]
    (fn [tuple-batch]
      (let [grouped (fast-group-by task-getter tuple-batch)] ;;将tuple-batch按executor进行分组
        (fast-map-iter [[short-executor pairs] grouped] ;;对应grouped里面每个entry执行下面的逻辑
          (let [q (short-executor-receive-queue-map short-executor)]
            (if q
              (disruptor/publish q pairs) ;;将tuple pairs发送到executor所对应的接收queue里面
              (log-warn "Received invalid messages for unknown tasks. Dropping... ")
              )))))))

 

(defn fast-group-by [afn alist]
  (let [ret (HashMap.)]
    (fast-list-iter [e alist] ;;宏, e表示list里面的elem, 下面的逻辑会在每个elem上执行
      (let [key (afn e) ;;将afn(e)作为key
            ^List curr (get-with-default ret key (ArrayList.))] ;;value是arraylist, 如果第一次就创建
        (.add curr e))) ;;把e加到对应key的arraylist里面
    ret ))
作用就是将alist里面的elem, 按afn(elem)作为key, 经行group, 最终返回hashmap, 以便通过key可以取出所有的elem

 

(defmacro fast-map-iter [[bind amap] & body]
  `(let [iter# (map-iter ~amap)]  ;;把map转化为entryset, 并返回iterator
    (while (iter-has-next? iter#)
      (let [entry# (iter-next iter#) 
            ~bind (convert-entry entry#)]
        ~@body
        ))))
对上面的例子,
bind = [ short-executor pairs]
amap = grouped
grouped的一个entry是, {: short-executor pairs}
一个简化的iter map的宏, 比较难于理解

1.5.3 msg-loader/launch-receive-thread!

a, 使用async-loop, 创建异步执行loop的线程, 并start thread
   主要的逻辑是, bind到socket端口, 不停的recv messages
   当接收完一批, 通过transfer-local-fn放到接收队列

b, 在async-loop中已经start thread, 完成let的时候thread已经在工作了
   这个function的返回值, 很有意思, 其实是这个thread的close function, 并且由于闭包了该thread, 使得这个thread在close前一直存在

(defnk launch-receive-thread!
  [context storm-id port transfer-local-fn max-buffer-size
   :daemon true
   :kill-fn (fn [t] (System/exit 1))
   :priority Thread/NORM_PRIORITY]
  (let [max-buffer-size (int max-buffer-size)
        vthread (async-loop
                 (fn []
                   (let [socket (.bind ^IContext context storm-id port)]
                     (fn []
                       (let [batched (ArrayList.)
                             init (.recv ^IConnection socket 0)]  ;;block方式
                         (loop [packet init]
                           (let [task (if packet (.task ^TaskMessage packet))
                                 message (if packet (.message ^TaskMessage packet))]
                             (if (= task -1) ;;收到结束命令
                               (do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
                                 (.close socket)
                                 nil )
                               (do
                                 (when packet (.add batched [task message]))
                                 (if (and packet (< (.size batched) max-buffer-size))
                                   (recur (.recv ^IConnection socket 1)) ;;non-block方式, 无数据则loop结束
                                   (do (transfer-local-fn batched) ;;将batched数据放到各个task对应的接收队列
                                     0 ))))))))))
                 :factory? true
                 :daemon daemon
                 :kill-fn kill-fn
                 :priority priority)]
    (fn []  ;;该thread的close function
      (let [kill-socket (.connect ^IContext context storm-id "localhost" port)] ;;本地创建client socket用于发送kill cmd
        (log-message "Shutting down receiving-thread: [" storm-id ", " port "]")
        (.send ^IConnection kill-socket   ;;发送kill cmd, -1
                  -1
                  (byte-array []))
        (log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")
        (.join vthread)  ;;等待thread结束
        (.close ^IConnection kill-socket)
        (log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
        ))))

1.6 生成mk-transfer-tuples-handler, 并创建transfer-thread

生成disrputor的event handler,
将packets不停的放到drainer里面, 当batch结束时, 将drainer里面的每条message发送到对应task的connection

(defn mk-transfer-tuples-handler [worker]
  (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
        drainer (ArrayList.)
        node+port->socket (:cached-node+port->socket worker)
        task->node+port (:cached-task->node+port worker)
        endpoint-socket-lock (:endpoint-socket-lock worker)
        ]
    (disruptor/clojure-handler
      (fn [packets _ batch-end?]
        (.addAll drainer packets)
        (when batch-end?
          (read-locked endpoint-socket-lock
            (let [node+port->socket @node+port->socket
                  task->node+port @task->node+port]
              ;; consider doing some automatic batching here (would need to not be serialized at this point to remove per-tuple overhead)
              ;; try using multipart messages ... first sort the tuples by the target node (without changing the local ordering)
            
              (fast-list-iter [[task ser-tuple] drainer]
                ;; TODO: consider write a batch of tuples here to every target worker  
                ;; group by node+port, do multipart send              
                (let [node-port (get task->node+port task)]
                  (when node-port
                    (.send ^IConnection (get node+port->socket node-port) task ser-tuple))
                    ))))
          (.clear drainer))))))

 

总结,

从下图比较清晰的可以看出worker做了哪些事情,
1. 根据assignment变化, 调整或创建send-connection
2. 创建executors的输入和输出queue
3. 创建worker的接收和发送线程, receive thread和tansfer thread
4. 根据assignments关系, 创建executors

其中线程间通信使用的是, disruptor
而进程间通信使用的是, ZMQ

image


转自:http://www.cnblogs.com/fxjwind/p/3208497
2019-03-02 23:51

知识点

相关教程

更多

Storm-源码分析-Topology Submit-Nimbus

Nimbus Server   Nimbus server, 首先从启动命令开始, 同样是使用storm命令"storm nimbus”来启动  看下源码, 此处和上面client不同, jvmtype="-server", 最终调用"backtype.storm.daemon.nimbus"的main  nimbus是用clojure实现的, 但

Storm-源码分析-Topology Submit-Executor

在worker中通过executor/mk-executor worker e, 创建每个executor    (defn mk-executor [worker executor-id]  (let [executor-data (mk-executor-data worker executor-id) ;;1.mk-executor-data         _ (log-message &

Storm-源码分析-Topology Submit-Client

1 Storm Client   最开始使用storm命令来启动topology, 如下    storm jar storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.WordCountTopology   这个storm命令是用python实现的, 看看其中的jar函数, 很简单, 调用exec_storm_class, 其中jvm

Storm-源码分析-Topology Submit-Task

mk-task, 比较简单, 因为task只是概念上的结构, 不象其他worker, executor都需要创建进程或线程  所以其核心其实就是mk-task-data,  1. 创建TopologyContext对象, 其实就是把之前的topology对象和worker-data混合到一起, 便于task在执行时可以取到需要的topology信息.  2. 创建task-object, spou

Storm-源码分析-Topology Submit-Supervisor

mk-supervisor    (defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]  (log-message "Starting Supervisor with conf " conf)  (.prepare isupervisor conf (supervisor-isuper

Storm-源码分析-Topology Submit-Nimbus-mk-assignments

什么是"mk-assignment”, 主要就是产生executor->node+port关系, 将executor分配到哪个node的哪个slot上(port代表slot, 一个slot可以run一个worker进程, 一个worker包含多个executor线程) 先搞清什么是executor, 参考Storm-源码分析- Component ,Executor ,Task之间

Storm-源码分析-Topology Submit-Executor-mk-threads

对于executor thread是整个storm最为核心的代码, 因为在这个thread里面真正完成了大部分工作, 而其他的如supervisor,worker都是封装调用. 对于executor的mk-threads, 是通过mutilmethods对spout和bolt分别定义不同的逻辑 1. Spout Thread    (defmethod mk-threads :spout [exe

Storm-源码分析汇总

Storm Features  Storm 简介 Storm Topology的并发度 Storm - Guaranteeing message processing Storm - Transactional-topologies Twitter Storm – DRPC Storm 多语言支持 Storm Starter  Storm starter - Overview Storm star

Storm-源码分析- Multimethods使用例子

1. storm通过multimethods来区分local和distributed模式  当调用launch-worker的时候, clojure会自动根据defmulti里面定义的fn来判断是调用哪个版本的launch-worker    (defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervis

Storm-源码分析- metric

首先定义一系列metric相关的interface, IMetric, IReducer, ICombiner (backtype.storm.metric.api) 在task中, 创建一系列builtin-metrics, (backtype.storm.daemon.builtin-metrics), 并注册到topology context里面 task会不断的利用如spout-acked

Storm-源码分析- Storm中Zookeeper的使用

在backtype.storm.cluster.clj中, 定义了storm对于Zookeeper的使用 ClusterState  首先定义操作Zookeeper集群的interface    (defprotocol ClusterState  (set-ephemeral-node [this path data])  (delete-node [this path])  (create-s

storm-0.8.2源码分析之topology启动

topology启动  一个topology的启动包括了三个步骤 1)创建TopologyBuilder,设置输入源,输出源 2)获取config 3)提交topology(这里不考虑LocalCluster本地模式) 以storm.starter的ExclamationTopology为例: public static void main(String[] args)throws Excepti

Storm-源码分析- Disruptor在storm中的使用

Disruptor 2.0, (http://ifeve.com/disruptor-2-change/)  Disruptor为了更便于使用, 在2.0做了比较大的调整, 比较突出的是更换了几乎所有的概念名 老版本,    新版本,   从左到右的变化如下, 1. Producer –> Publisher  2. ProducerBarrier被integrate到RingBuffer里

Storm-源码分析- spout (backtype.storm.spout)

1. ISpout接口  ISpout作为实现spout的核心interface, spout负责feeding message, 并且track这些message.  如果需要Spout track发出的message, 必须给出message-id, 这个message-id可以是任意类型, 但是如果不指定或将message-id置空, storm就不会track这个message 必须要注意

Storm-源码分析- Messaging (backtype.storm.messaging)

先定义两个接口和一个类  TaskMessage类本身比较好理解, 抽象storm的message格式  对于IContext, 注释也说了, 定义messaging plugin, 通过什么渠道去发送message, storm这里设计成可替换的  默认定义storm实现了local和ZMQ两种plugin, 当然你可以实现更多的  local应该是用于local mode, 而ZMQ用于dis

最新教程

更多

java线程状态详解(6种)

java线程类为:java.lang.Thread,其实现java.lang.Runnable接口。 线程在运行过程中有6种状态,分别如下: NEW:初始状态,线程被构建,但是还没有调用start()方法 RUNNABLE:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行状态” BLOCK:阻塞状态,表示线程阻塞

redis从库只读设置-redis集群管理

默认情况下redis数据库充当slave角色时是只读的不能进行写操作,如果写入,会提示以下错误:READONLY You can't write against a read only slave.  127.0.0.1:6382> set k3 111  (error) READONLY You can't write against a read only slave. 如果你要开启从库

Netty环境配置

netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。

Netty基于流的传输处理

​在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据

Netty入门实例-使用POJO代替ByteBuf

使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。

Netty入门实例-时间服务器

Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现

Netty入门实例-编写服务器端程序

channelRead()处理程序方法实现如下

Netty开发环境配置

最新版本的Netty 4.x和JDK 1.6及更高版本

电商平台数据库设计

电商平台数据库表设计:商品分类表、商品信息表、品牌表、商品属性表、商品属性扩展表、规格表、规格扩展表

HttpClient 上传文件

我们使用MultipartEntityBuilder创建一个HttpEntity。 当创建构建器时,添加一个二进制体 - 包含将要上传的文件以及一个文本正文。 接下来,使用RequestBuilder创建一个HTTP请求,并分配先前创建的HttpEntity。

MongoDB常用命令

查看当前使用的数据库    > db    test  切换数据库   > use foobar    switched to db foobar  插入文档    > post={"title":"领悟书生","content":"这是一个分享教程的网站","date":new

快速了解MongoDB【基本概念与体系结构】

什么是MongoDB MongoDB is a general purpose, document-based, distributed database built for modern application developers and for the cloud era. MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。

windows系统安装MongoDB

安装 下载MongoDB的安装包:mongodb-win32-x86_64-2008plus-ssl-3.2.10-signed.msi,按照提示步骤安装即可。 安装完成后,软件会安装在C:\Program Files\MongoDB 目录中 我们要启动的服务程序就是C:\Program Files\MongoDB\Server\3.2\bin目录下的mongod.exe,为了方便我们每次启动,我

Spring boot整合MyBatis-Plus 之二:增删改查

基于上一篇springboot整合MyBatis-Plus之后,实现简单的增删改查 创建实体类 添加表注解TableName和主键注解TableId import com.baomidou.mybatisplus.annotations.TableId;
import com.baomidou.mybatisplus.annotations.TableName;
import com.baom

分布式ID生成器【snowflake雪花算法】

基于snowflake雪花算法分布式ID生成器 snowflake雪花算法分布式ID生成器几大特点: 41bit的时间戳可以支持该算法使用到2082年 10bit的工作机器id可以支持1024台机器 序列号支持1毫秒产生4096个自增序列id 整体上按照时间自增排序 整个分布式系统内不会产生ID碰撞 每秒能够产生26万ID左右 Twitter的 Snowflake分布式ID生成器的JAVA实现方案