kafka producer客户端
KafkaProducer的send方法:
1.等待kafka要发送的topic的partition都在线 2.序列化key,value; key:org.apache.kafka.common.serialization.IntegerSerializer value:org.apache.kafka.common.serialization.StringSerializer 3.根据发送数据计算索要发送的topic的partition 使用record记录中的partition,若为空,用paritition类计算 partition:org.apache.kafka.clients.producer.internals.DefaultPartitioner 4.确保所要发送的信息的序列化大小不超过阈值 阈值:MAX_REQUEST_SIZE_CONFIG = "max.request.size" BUFFER_MEMORY_CONFIG = "buffer.memory" 5.实例化topic的partition,实例化发送对象result,添加accumulator中的topic队列中 封装为writable records,包含compresor压缩,再封装为batch 压缩参数:COMPRESSION_TYPE_CONFIG = "compression.type"; 6.查看result的batch是否或是新建的,则唤醒sener发送消息 7.返回result的future
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { try { // @1 first make sure the metadata for the topic is available waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); byte[] serializedKey; try { // @2 序列化key serializedKey = keySerializer.serialize(record.topic(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer"); } byte[] serializedValue; try { //序列化value serializedValue = valueSerializer.serialize(record.topic(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer"); } //@3 计算partition int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); ensureValidRecordSize(serializedSize);//@4 确保发送请求不超过阈值 TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); //@5 发送封装好的对象 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback); if (result.batchIsFull || result.newBatchCreated) {//@6 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future;//@7 // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); throw e; } }
其中第5步,添加到accumulator代码如下:
将record添加到topic的partition队列中,如果存在则添加;
如果不存在则创建队列,二次检查队列是否有值,如果有,则将record添加
如果没有,则封装writable records,包含compresor压缩
和batch类;
record.append的时候调用compressor进行压缩
存在与否都将当record添加到队列中,并且进行压缩(如果配置压缩)
public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); try { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // check if we have an in-progress batch Deque<RecordBatch> dq = dequeFor(tp); synchronized (dq) {//添加到已经存在的topic队列中 RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback); if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } // we don't have an in-progress record batch try to allocate a new batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); //重复判断其他producer有没有放到dp中 RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } //没有当前topic的数据 //封装writable records,包含compresor压缩 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());//封装为recordBatch FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } finally { appendsInProgress.decrementAndGet(); } }
MemoryRecords
public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) { if (!this.records.hasRoomFor(key, value)) {//mem溢出 return null; } else { this.records.append(0L, key, value);//添加到record中,又进行压缩操作 this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } }
在第6步,sender类发送消息run方法:
1.检查record是否准备好:
The record set is full The record set has sat in the accumulator for at least lingerMs milliseconds The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are immediately considered ready). The accumulator has been closed
2.获取accumulator中所有数据
3. 生成request中
4.填入selector的client中
5.client selector nio发送数据
public void run(long now) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send 获得数据的leader RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update if (result.unknownLeadersExist) this.metadata.requestUpdate(); // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // create produce requests Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); //only for debug test // if(batches.size()>=1){ // System.out.println(batches.size()); // } sensors.updateProduceRequestMetrics(batches); List<ClientRequest> requests = createProduceRequests(batches, now);//生成request // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client.send(request); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; this.client.poll(pollTimeout, now);//nio 发送数据 }
相关推荐
人工智能-深度学习-tensorflow
人工智能毕业设计&课程设计
基于ssm的中小型企业财务管理录系统.zip
这个资源是一个基于Spring Boot和MySQL的洗衣店订单管理系统的完整源码。它包括了所有的源代码文件,以及一个详细的文档,可以帮助你理解和运行这个系统。这个系统的主要功能包括:用户注册和登录,下单,查看订单,修改订单,删除订单等。用户可以在系统中选择洗衣服务,然后提交订单。系统会自动计算订单的总价,并将其显示在用户的订单列表中。用户还可以查看自己的历史订单,以及每个订单的详细信息。此外,系统还包括了一个管理员模块。管理员可以查看所有的订单,以及对订单进行管理。他们可以修改订单的状态,例如将订单标记为已完成,或者取消订单。这个系统使用了Spring Boot框架,这是一个非常流行的Java开发框架,它可以帮助你快速地开发和部署应用程序。同时,系统也使用了MySQL数据库,这是一个广泛使用的关系型数据库,它可以存储大量的数据,并提供高效的查询功能。总的来说,这个资源是一个非常完整的洗衣店订单管理系统的源码,它可以帮助你理解如何使用Spring Boot和MySQL来开发一个实际的应用程序。无论你是正在学习Java编程,还是已经有一定的开发经验,都可以从这个资源中学到很多有用的知识和技能。
W9825G6KH-6I SDRAM,256Mb(32MB,16Mbx16),3.3v 动态随机存取存储器
Python库是一组预先编写的代码模块,旨在帮助开发者实现特定的编程任务,无需从零开始编写代码。这些库可以包括各种功能,如数学运算、文件操作、数据分析和网络编程等。Python社区提供了大量的第三方库,如NumPy、Pandas和Requests,极大地丰富了Python的应用领域,从数据科学到Web开发。Python库的丰富性是Python成为最受欢迎的编程语言之一的关键原因之一。这些库不仅为初学者提供了快速入门的途径,而且为经验丰富的开发者提供了强大的工具,以高效率、高质量地完成复杂任务。例如,Matplotlib和Seaborn库在数据可视化领域内非常受欢迎,它们提供了广泛的工具和技术,可以创建高度定制化的图表和图形,帮助数据科学家和分析师在数据探索和结果展示中更有效地传达信息。
人工智能毕业设计&课程设计
asp代码ASP基于web的学校新闻发布系统开发(论文+源代码+开题报告+文献综述+外文翻译)本资源系百度网盘分享地址
三菱PLC例程源码PLC 气压程式本资源系百度网盘分享地址
三菱PLC例程源码PLC通过RS485 对FR系列变频的控制本资源系百度网盘分享地址
基于ssm的学生档案管理系统.zip
这是一个基于Java语言开发的elfinder 2.x版本Web文件管理器后端设计,包含63个文件,其中主要文件类型包括49个Java源文件、3个XML文件、2个PNG图片文件、2个Markdown文档、1个gitattributes文件、1个gitignore文件、1个LICENSE文件、1个Properties文件、1个types文件和1个未知类型的文件。该项目提供了丰富的文件管理功能,包括自定义文件视图和自定义文件操作,为用户提供了高效、便捷的文件管理体验。
使用DS Client在PPT中动态展示分子三维结构
基于ssm+vue的汽车站车辆运管系统.zip
算、文件操作、数据分析和网络编程等。Python社区提供了大量的第三方库,如NumPy、Pandas和Requests,极大地丰富了Python的应用领域,从数据科学到Web开发。Python库的丰富性是Python成为最受欢迎的编程语言之一的关键原因之一。这些库不仅为初学者提供了快速入门的途径,而且为经验丰富的开发者提供了强大的工具,以高效率、高质量地完成复杂任务。例如,Matplotlib和Seaborn库在数据可视化领域内非常受欢迎,它们提供了广泛的工具和技术,可以创建高度定制化的图表和图形,帮助数据科学家和分析师在数据探索和结果展示中更有效地传达信息。
DS在生物药物领域的解决方案
三菱PLC例程源码SBR废水处理本资源系百度网盘分享地址
人工智能毕业设计&课程设计
人工智能-深度学习-tensorflow
基于ssm电子病历系统.zip