CoordinatedBolt and LinearDRPCTopologyBuilder

src/jvm/backtype/storm/drpc/CoordinatedBolt.java

package backtype.storm.drpc;

import backtype.storm.generated.GlobalStreamId;
import backtype.storm.Config;
import java.util.Collection;
import backtype.storm.Constants;
import backtype.storm.generated.Grouping;
import backtype.storm.task.IOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.TimeCacheMap;
import backtype.storm.utils.Utils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import static backtype.storm.utils.Utils.get;
import static backtype.storm.utils.Utils.tuple;


public class CoordinatedBolt implements IRichBolt {
    public static Logger LOG = Logger.getLogger(CoordinatedBolt.class);

    public static interface FinishedCallback {
        void finishedId(Object id);
    }

    public static class SourceArgs implements Serializable {
        public boolean singleCount;

        protected SourceArgs(boolean singleCount) {
            this.singleCount = singleCount;
        }

        public static SourceArgs single() {
            return new SourceArgs(true);
        }

        public static SourceArgs all() {
            return new SourceArgs(false);
        }
        
        @Override
        public String toString() {
            return "<Single: " + singleCount + ">";
        }
    }

    public class CoordinatedOutputCollector extends OutputCollector {
        IOutputCollector _delegate;

        public CoordinatedOutputCollector(IOutputCollector delegate) {
            _delegate = delegate;
        }

        public List<Integer> emit(String stream, Collection<Tuple> anchors, List<Object> tuple) {
            List<Integer> tasks = _delegate.emit(stream, anchors, tuple);
            updateTaskCounts(tuple.get(0), tasks);
            return tasks;
        }

        public void emitDirect(int task, String stream, Collection<Tuple> anchors, List<Object> tuple) {
            updateTaskCounts(tuple.get(0), Arrays.asList(task));
            _delegate.emitDirect(task, stream, anchors, tuple);
        }

        public void ack(Tuple tuple) {
            _delegate.ack(tuple);
            Object id = tuple.getValue(0);
            synchronized(_tracked) {
                _tracked.get(id).receivedTuples++;
            }
            checkFinishId(id);
        }

        public void fail(Tuple tuple) {
            _delegate.fail(tuple);
            Object id = tuple.getValue(0);
            synchronized(_tracked) {
                _tracked.get(id).receivedTuples++;
            }
            checkFinishId(id);
        }
        
        public void reportError(Throwable error) {
            _delegate.reportError(error);
        }


        private void updateTaskCounts(Object id, List<Integer> tasks) {
            Map<Integer, Integer> taskEmittedTuples = _tracked.get(id).taskEmittedTuples;
            for(Integer task: tasks) {
                int newCount = get(taskEmittedTuples, task, 0) + 1;
                taskEmittedTuples.put(task, newCount);
            }
        }
    }

    private SourceArgs _sourceArgs;
    private String _idComponent;
    private IRichBolt _delegate;
    private Integer _numSourceReports;
    private List<Integer> _countOutTasks = new ArrayList<Integer>();;
    private OutputCollector _collector;
    private TimeCacheMap<Object, TrackingInfo> _tracked;

    public static class TrackingInfo {
        int reportCount = 0;
        int expectedTupleCount = 0;
        int receivedTuples = 0;
        Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();
        boolean receivedId = false;
        
        @Override
        public String toString() {
            return "reportCount: " + reportCount + "\n" +
                   "expectedTupleCount: " + expectedTupleCount + "\n" +
                   "receivedTuples: " + receivedTuples + "\n" +
                   taskEmittedTuples.toString();
        }
    }

    
    public CoordinatedBolt(IRichBolt delegate) {
        this(delegate, null, null);
    }

    public CoordinatedBolt(IRichBolt delegate, SourceArgs sourceArgs, String idComponent) {
        _sourceArgs = sourceArgs;
        _delegate = delegate;
        _idComponent = idComponent;
    }

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        _tracked = new TimeCacheMap<Object, TrackingInfo>(Utils.toInteger(config.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
        _collector = collector;
        _delegate.prepare(config, context, new CoordinatedOutputCollector(collector));
        for(String component: Utils.get(context.getThisTargets(),
                                        Constants.COORDINATED_STREAM_ID,
                                        new HashMap<String, Grouping>())
                                        .keySet()) {
            for(Integer task: context.getComponentTasks(component)) {
                _countOutTasks.add(task);
            }
        }
        if(_sourceArgs!=null) {
            if(_sourceArgs.singleCount) {
                _numSourceReports = 1;
            } else {
                Iterator<GlobalStreamId> it = context.getThisSources().keySet().iterator();
                while(it.hasNext()) {
                    String sourceComponent = it.next().get_componentId();
                    if(_idComponent==null || !sourceComponent.equals(_idComponent)) {
                        _numSourceReports = context.getComponentTasks(sourceComponent).size();
                        break;
                    }
                }
            }
        }
    }

    private void checkFinishId(Object id) {
        synchronized(_tracked) {
            TrackingInfo track = _tracked.get(id);
            if(track!=null
                    && track.receivedId 
                    && (_sourceArgs==null
                        ||
                       track.reportCount==_numSourceReports &&
                       track.expectedTupleCount == track.receivedTuples)) {
                if(_delegate instanceof FinishedCallback) {
                    ((FinishedCallback)_delegate).finishedId(id);
                }
                Iterator<Integer> outTasks = _countOutTasks.iterator();
                while(outTasks.hasNext()) {
                    int task = outTasks.next();
                    int numTuples = get(track.taskEmittedTuples, task, 0);
                    _collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tuple(id, numTuples));
                }
                _tracked.remove(id);
            }
        }
    }

    public void execute(Tuple tuple) {
        Object id = tuple.getValue(0);
        TrackingInfo track;
        synchronized(_tracked) {
            track = _tracked.get(id);
            if(track==null) {
                track = new TrackingInfo();
                if(_idComponent==null) track.receivedId = true;
                _tracked.put(id, track);
            }
        }
        
        boolean checkFinish = false;
        if(_idComponent!=null
                && tuple.getSourceComponent().equals(_idComponent)
                && tuple.getSourceStreamId().equals(PrepareRequest.ID_STREAM)) {
            synchronized(_tracked) {
                track.receivedId = true;
            }
            checkFinish = true;
            _collector.ack(tuple);
        } else if(_sourceArgs!=null
                && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
            int count = (Integer) tuple.getValue(1);
            synchronized(_tracked) {
                track.reportCount++;
                track.expectedTupleCount+=count;
            }
            checkFinish = true;
            _collector.ack(tuple);
        } else {            
            _delegate.execute(tuple);
        }
        if(checkFinish) {
            checkFinishId(id);
        }
    }

    public void cleanup() {
        _delegate.cleanup();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        _delegate.declareOutputFields(declarer);
        declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields("id", "count"));
    }

}

src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java

package backtype.storm.drpc;

import backtype.storm.Constants;
import backtype.storm.ILocalDRPC;
import backtype.storm.drpc.CoordinatedBolt.FinishedCallback;
import backtype.storm.drpc.CoordinatedBolt.SourceArgs;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.StreamInfo;
import backtype.storm.topology.BasicBoltExecutor;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.InputDeclarer;
import backtype.storm.topology.OutputFieldsGetter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;


// need a "final bolt" method, that does fields groupings based on the first field of previous streams.
// preparerequest needs to emit to a special stream to indicate which task in the last bolt is responsible for that id?
// -- what if it's shuffle grouping all the way through? need to enforce that last bolt do fields grouping on id...
public class LinearDRPCTopologyBuilder {    
    String _function;
    List<Component> _components = new ArrayList<Component>();
    
    
    public LinearDRPCTopologyBuilder(String function) {
        _function = function;
    }
        
    public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, int parallelism) {
        Component component = new Component(bolt, parallelism);
        _components.add(component);
        return new InputDeclarerImpl(component);
    }
    
    public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) {
        return addBolt(bolt, 1);
    }
    
    public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, int parallelism) {
        return addBolt(new BasicBoltExecutor(bolt), parallelism);
    }

    public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) {
        return addBolt(bolt, 1);
    }
        
    public StormTopology createLocalTopology(ILocalDRPC drpc) {
        return createTopology(new DRPCSpout(_function, drpc));
    }
    
    public StormTopology createRemoteTopology() {
        return createTopology(new DRPCSpout(_function));
    }
    
    
    private StormTopology createTopology(DRPCSpout spout) {
        final String SPOUT_ID = "spout";
        final String PREPARE_ID = "prepare-request";
        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(SPOUT_ID, spout);
        builder.setBolt(PREPARE_ID, new PrepareRequest())
                .noneGrouping(SPOUT_ID);
        int i=0;
        for(; i<_components.size();i++) {
            Component component = _components.get(i);
            
            SourceArgs source;
            if(i==0) {
                source = null;
            } else if (i==1) {
                source = SourceArgs.single();
            } else {
                source = SourceArgs.all();
            }
            String idComponent = null;
            if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) {
                idComponent = PREPARE_ID;
            }
            InputDeclarer declarer = builder.setBolt(
                    boltId(i),
                    new CoordinatedBolt(component.bolt, source, idComponent),
                    component.parallelism);
            if(idComponent!=null) {
                declarer.fieldsGrouping(idComponent, PrepareRequest.ID_STREAM, new Fields("request"));
            }
            if(i==0 && component.declarations.size()==0) {
                declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
            } else {
                String prevId;
                if(i==0) {
                    prevId = PREPARE_ID;
                } else {
                    prevId = boltId(i-1);
                }
                for(InputDeclaration declaration: component.declarations) {
                    declaration.declare(prevId, declarer);
                }
            }
            if(i>0) {
                declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID); 
            }
        }
        
        IRichBolt lastBolt = _components.get(_components.size()-1).bolt;
        OutputFieldsGetter getter = new OutputFieldsGetter();
        lastBolt.declareOutputFields(getter);
        Map<String, StreamInfo> streams = getter.getFieldsDeclaration();
        if(streams.size()!=1) {
            throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");
        }
        String outputStream = streams.keySet().iterator().next();
        List<String> fields = streams.get(outputStream).get_output_fields();
        if(fields.size()!=2) {
            throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");
        }

        builder.setBolt(boltId(i), new JoinResult(PREPARE_ID))
                .fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0)))
                .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));
        i++;
        builder.setBolt(boltId(i), new ReturnResults())
                .noneGrouping(boltId(i-1));
        return builder.createTopology();
    }
    
    private static String boltId(int index) {
        return "bolt" + index;
    }
    
    private static class Component {
        public IRichBolt bolt;
        public int parallelism;
        public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
        
        public Component(IRichBolt bolt, int parallelism) {
            this.bolt = bolt;
            this.parallelism = parallelism;
        }
    }
    
    private static interface InputDeclaration {
        public void declare(String prevComponent, InputDeclarer declarer);
    }
    
    private class InputDeclarerImpl implements LinearDRPCInputDeclarer {
        Component _component;
        
        public InputDeclarerImpl(Component component) {
            _component = component;
        }
        
        @Override
        public LinearDRPCInputDeclarer fieldsGrouping(final Fields fields) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(String prevComponent, InputDeclarer declarer) {
                    declarer.fieldsGrouping(prevComponent, fields);
                }                
            });
            return this;
        }

        @Override
        public LinearDRPCInputDeclarer fieldsGrouping(final String streamId, final Fields fields) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(String prevComponent, InputDeclarer declarer) {
                    declarer.fieldsGrouping(prevComponent, streamId, fields);
                }                
            });
            return this;
        }

        @Override
        public LinearDRPCInputDeclarer globalGrouping() {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(String prevComponent, InputDeclarer declarer) {
                    declarer.globalGrouping(prevComponent);
                }                
            });
            return this;
        }

        @Override
        public LinearDRPCInputDeclarer globalGrouping(final String streamId) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(String prevComponent, InputDeclarer declarer) {
                    declarer.globalGrouping(prevComponent, streamId);
                }                
            });
            return this;
        }

        @Override
        public LinearDRPCInputDeclarer shuffleGrouping() {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(String prevComponent, InputDeclarer declarer) {
                    declarer.shuffleGrouping(prevComponent);
                }                
            });
            return this;
        }

        @Override
        public LinearDRPCInputDeclarer shuffleGrouping(final String streamId) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(String prevComponent, InputDeclarer declarer) {
                    declarer.shuffleGrouping(prevComponent, streamId);
                }                
            });
            return this;
        }

        @Override
        public LinearDRPCInputDeclarer noneGrouping() {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(String prevComponent, InputDeclarer declarer) {
                    declarer.noneGrouping(prevComponent);
                }                
            });
            return this;
        }

        @Override
        public LinearDRPCInputDeclarer noneGrouping(final String streamId) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(String prevComponent, InputDeclarer declarer) {
                    declarer.noneGrouping(prevComponent, streamId);
                }                
            });
            return this;
        }

        @Override
        public LinearDRPCInputDeclarer allGrouping() {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(String prevComponent, InputDeclarer declarer) {
                    declarer.allGrouping(prevComponent);
                }                
            });
            return this;
        }

        @Override
        public LinearDRPCInputDeclarer allGrouping(final String streamId) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(String prevComponent, InputDeclarer declarer) {
                    declarer.allGrouping(prevComponent, streamId);
                }                
            });
            return this;
        }

        @Override
        public LinearDRPCInputDeclarer directGrouping() {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(String prevComponent, InputDeclarer declarer) {
                    declarer.directGrouping(prevComponent);
                }                
            });
            return this;
        }

        @Override
        public LinearDRPCInputDeclarer directGrouping(final String streamId) {
            addDeclaration(new InputDeclaration() {
                @Override
                public void declare(String prevComponent, InputDeclarer declarer) {
                    declarer.directGrouping(prevComponent, streamId);
                }                
            });
            return this;
        }
        
        private void addDeclaration(InputDeclaration declaration) {
            _component.declarations.add(declaration);
        }        
    }
}
src/jvm/backtype/storm/drpc/CoordinatedBolt.java:57:1:Wrapper around OutputCollector that tracks where emitted tuples are sent src/jvm/backtype/storm/drpc/CoordinatedBolt.java:98:1:Called when tuples are emitted, for each task that an emitted tuple is sent to increment the number of tuples "in play" for this request src/jvm/backtype/storm/drpc/CoordinatedBolt.java:75:1:increment the number of received tuples for this request on ack or fail src/jvm/backtype/storm/drpc/CoordinatedBolt.java:113:1:a map from request_id to TrackingInfo, TimeCacheMap is useful because tracking info is null and void if the request times out src/jvm/backtype/storm/drpc/CoordinatedBolt.java:118:1:total number of tuples that have been acked or failed for this request src/jvm/backtype/storm/drpc/CoordinatedBolt.java:119:1:map from task_id to number of tuples that have been emitted to it src/jvm/backtype/storm/drpc/CoordinatedBolt.java:136:1:delegates all actions to the bolt it wraps src/jvm/backtype/storm/drpc/CoordinatedBolt.java:146:1:all the component_ids that are subscribed to the COORDINATED_STREAM_ID src/jvm/backtype/storm/drpc/CoordinatedBolt.java:151:1:assembles the list of all task_ids that are subscribed to the COORDINATED_STREAM_ID stream across all components from this bolt src/jvm/backtype/storm/drpc/CoordinatedBolt.java:172:1:tracking info for this request src/jvm/backtype/storm/drpc/CoordinatedBolt.java:179:1:If the delegate implements FinishedCallback, call its finishedId method src/jvm/backtype/storm/drpc/CoordinatedBolt.java:183:1:for each task subscribed in a coordinated way, send it the count of tuples this bolt has emitted for this request to it src/jvm/backtype/storm/drpc/CoordinatedBolt.java:188:1:for this bolt, it has received all the tuples it will receive on this request, it is finished, and there is no reason to keep tracking this request src/jvm/backtype/storm/drpc/CoordinatedBolt.java:197:1:If we haven't seen any tuples for this request and are therefore not tracking it yet, set up a new tracking info for it src/jvm/backtype/storm/drpc/CoordinatedBolt.java:206:1:if this is a book keeping tuple, it should not be handled by the delegate but it should change the tracking state src/jvm/backtype/storm/drpc/CoordinatedBolt.java:224:1:not a book keeping tuple, send on to the delegate src/jvm/backtype/storm/drpc/CoordinatedBolt.java:237:1:in addition to all the streams/fields the delegate might have, we need a direct stream (COORDINATED_STREAM_ID) to send counts of tuples for a request id to downstream tasks src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:26:1:topology is linear so this is just the list of component steps src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:33:1:just add bolt to the list of steps, will be added to the real topology builder when told to createTopology src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:51:1:helper around the case where we have a LocalDRPC object for our local cluster src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:60:1:creates the actual topology using a topology builder src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:65:1:our spout is set to the DRPCSpout that was passed in src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:66:1:All DRPC requests first pass through PrepareRequest, which generates the request id src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:74:2:if this is the first step, it has no source src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:76:1:If this is the second step it has one source src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:78:1:Otherwise, it has all sources which causes CoordinatedBolt's _numSourceReports to be calculated differently src/jvm/backtype/storm/drpc/CoordinatedBolt.java:155:1:if there is a single source, the _numSourceReports is 1 src/jvm/backtype/storm/drpc/CoordinatedBolt.java:159:29:otherwise we're going off of the number of component tasks from the previous step src/jvm/backtype/storm/drpc/CoordinatedBolt.java:161:29:Pretty sure this makes sure that the final step in the topology ignores the tuples coming off of the ID_STREAM of PrepareRequest src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:81:1:if this is the final step and it implements finished callback, it will expect to receive a stream of ids from PrepareRequest src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:84:1:all bolts in the topology need to be wrapped by CoordinatedBolt src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:88:1:If this is the final step and it is a FinishedCallback and expects ids from PrepareRequest, it needs to be hooked up with the ID_STREAM from PrepareRequest src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:91:1:if this is the first step, it needs to be hooked up to the arguments that were sent with the DRPC request and then separated out by PrepareRequest src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:93:1:otherwise, find the previous step (or PrepareRequest if this is the first) and apply the groupings that were declared on the step so that output from the previous step goes over those groupings to this step src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:104:1:if this is not the first step, it needs to receive the count of tuples for a request id over the COORDINATED_STREAM_ID src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:109:1:the last bolt gets special treatment src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:113:1:Linear DRPC topologies require that the final bolt only has one output stream src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:118:1:it is also important that the only fields from the last bolt be a request_id and the result (which is going to get brutally cast to a string) src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:122:1:after the final step, you need to join the result for the request to the return info. JoinResult groups on request id and subscribes to the output from the last step as well as the return info stream from PrepareRequest src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:126:1:the ReturnResults bolt simply sends the result to the client that is reachable from the return_info. This is when your DRPC client will receive the result of the computation src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java:128:1:and we're done, notice how this is just a normal storm topology built on top of standard primitives with a significant amount of cleverness in the bolt implementation src/jvm/backtype/storm/drpc/CoordinatedBolt.java:206:1:if this is the final step and this tuple is coming from PrepareRequest over its ID_STREAM src/jvm/backtype/storm/drpc/CoordinatedBolt.java:209:1:we can say that we have received the request id and we should check if it is finished src/jvm/backtype/storm/drpc/CoordinatedBolt.java:214:1:if this is a message from the upstream task over the COORDINATED_STREAM then we should expect another 'count' tuples and also rest assured that task is done and has reported in src/jvm/backtype/storm/drpc/CoordinatedBolt.java:221:1:and we should of course also check if this thing is finished src/jvm/backtype/storm/drpc/CoordinatedBolt.java:200:1:if we are not supposed to receive a stream of ids from PrepareRequest, then we can act as though we already received the id (which of course we never will) src/jvm/backtype/storm/drpc/CoordinatedBolt.java:173:1:this is obviously not finished if we weren't even tracking it src/jvm/backtype/storm/drpc/CoordinatedBolt.java:174:1:if this is the last step we need to have received the request id from PrepareRequest before this can be considered complete src/jvm/backtype/storm/drpc/CoordinatedBolt.java:175:1:if we don't have a source, we're complete now src/jvm/backtype/storm/drpc/CoordinatedBolt.java:177:1:if we do have a source, we need to make sure that all of our upstream tasks have reported in src/jvm/backtype/storm/drpc/CoordinatedBolt.java:178:1:and that the number of tuples we were told to expect by our upstream tasks matches the number of tuples we, in fact, received