This blog post is co-authored with Donnie Kerr, Principal Technology Architect at Centric Consulting
Introduction
In today’s fast-paced technological landscape, organizations are continuously exploring opportunities to harness the power of real-time data streaming and artificial intelligence (AI) to drive productivity and streamline processes. This blog post presents an innovative concept called the Collaborative Intelligence Platform (CIP) by our partner Centric Consulting, which revolutionizes decision-making by seamlessly integrating AI, private enterprise data, and human guidance. Central to CIP are cutting-edge technologies such as Waterstream Cloud, Confluent Cloud, and advanced language models (LLMs) like ChatGPT.
From Waterstream to Waterstream Cloud
Waterstream was created as a straightforward yet sophisticated solution to bridge the gap between the Internet of Things (IoT) world, which utilizes MQTT, and the Apache Kafka® data streaming ecosystem. Despite its conceptual simplicity, Waterstream offers a multitude of configurations that users can customize. These configurations include mapping MQTT topics to Kafka topics and establishing authorization and authentication settings. Even seasoned DevOps professionals may require time to grasp and manage the details.
To address this, Waterstream has evolved from being solely available as a Docker image with accompanying scripts to becoming a software-as-a-service platform called Waterstream Cloud. Currently, Waterstream Cloud is accessible only by invitation (contact us if you want to be included in the waiting list), but it will soon transition to the General Availability status. Through a user-friendly interface, users can effortlessly create one or multiple Waterstream clusters in a specific region on AWS and connect them with existing Confluent or open source Kafka clusters with just a few clicks.
Once the connection to Kafka is set up, users can focus more on defining the business case, determining how Waterstream maps MQTT topics to Kafka topics, and establishing the appropriate authorization and authentication rules. Waterstream Cloud further enhances the user experience by integrating a straightforward UI, enabling users to send and receive messages from the Waterstream Cluster or any MQTT broker, provided that the cluster has WebSocket functionality enabled. Additionally, if the Kafka cluster is temporarily unavailable or inaccessible, users can still test Waterstream by creating a single-node sidecar Kafka cluster that allows Waterstream to operate. However, it’s important to note that the sidecar Kafka cluster is not designed for production use and cannot be accessed by external producers or consumers.
Waterstream Cloud and Confluent Cloud, a perfect match
Waterstream Cloud simplifies the creation of a Waterstream MQTT broker, and it pairs perfectly with Confluent Cloud. Confluent Cloud is a fully managed, cloud-native, and complete dataevent streaming platform offered by Confluent, the company founded by the original creators of Apache Kafka. It provides a scalable and reliable infrastructure for developing real-time data streaming applications and implementing event-driven architectures in the cloud.
By leveraging Confluent Cloud, users can harness the power of Kafka without the burden of managing the underlying infrastructure. To facilitate the seamless integration of Waterstream Cloud and Confluent Cloud, a dedicated configuration form is available. This form includes the necessary Kafka parameters from Confluent Cloud, making it effortless for users to connect the two platforms. Additionally, Waterstream offers the option to validate the payload of incoming MQTT messages, formatted as JSON, using the fully managed Schema Registry included in Confluent Cloud. It is worth mentioning that the integration of Waterstream with the Confluent Schema Registry is a powerful combination that brings unified validation for MQTT messages and Kafka records. This integration simplifies development because it ensures consistent validation rules, and enhances interoperability between MQTT-based devices and Kafka-based services.
Here is a short video showcasing the quick and straightforward process of starting a Waterstream Broker with Waterstream Cloud. It highlights the seamless connection to Confluent Cloud, enabling users to send MQTT messages directly from their browser, which are then written into Kafka.
The Collaborative Intelligence Platform
At this pivotal moment, generative AI technologies offer the potential for remarkable productivity gains that organizations desire. However, only 27% of companies currently use AI tools, despite the 90% higher productivity reported by AI users. To foster the success of generative AI in the workplace, it must operate in a trusted environment where users have control of their own data and unwavering confidence in the relevance of its outputs.
Donnie Kerr, Principal Architect at Centric Consulting, has been exploring the capabilities of this approach and introduced the paradigm of Collaborative Intelligence as a new methodology:
“Collaborative Intelligence leverages AI to mature a business process from little-to-no automation up to fully autonomous or somewhere in between. The end state of each process will vary depending on the goal. In some cases, a process can become fully autonomous, not requiring any human intervention. In other cases, a process may be very manual at first.”
The Collaborative Intelligence Platform (CIP) seamlessly incorporates cutting-edge technologies essential for the comprehensive implementation of the CI methodology. The platform architecture harmoniously integrates prominent cloud-native technologies, as depicted below:
To illustrate a practical application of CIP, let’s consider an example. Imagine an operator working in a logistics center who receives a notification through a conversational user interface powered by ChatGPT. In real-world data, the shipment system detects a delay of 30 minutes. The notification informs the operator that a shipment they are responsible for will be late. The operator is then prompted to decide whether they will handle the delayed shipment or if it should be reassigned to the next shift. As time progresses, the AI learns from patterns and identifies that parcels originating from a specific location often encounter delays, leading to automatic reassignment to the subsequent shift.
The core essence of CIP lies in the ability to seamlessly combine an AI platform with private enterprise data in real time, along with user decisions. In this scenario, Confluent Cloud plays a vital role by facilitating the merging of real-world events, user actions, and AI-generated outcomes. This integration enables a smooth and dynamic exchange of information, leading to improved collaboration and decision-making within the CIP framework letting the human control the full process.
Let’s delve into the technical workflow of the Collaborative Intelligence Platform (CIP) by examining a simple Proof of Concept (PoC) that highlights its essential components. This PoC leverages Waterstream Cloud and Confluent Cloud. Both technologies utilize the concept of topics, MQTT or Kafka. MQTT topics and Kafka topics share the same name to facilitate seamless mapping.
The process initiates with user interactions on the UI, where they make information requests using ChatGPT prompts. To streamline testing, the UI offers some examples to get the user started.
Within the UI front end, an MQTT client communicates with Waterstream Cloud to create new chats, send the user’s prompt, and remember the conversation history. Anyone experimenting with Chat GPT APIs knows it doesn’t maintain context. Your application will need to manage that, and we think of this in terms of state.
Usually, the app will have to send the call with the initial prompt payload (plus some hidden prompt engineering) and get the response, then send the initial prompt, plus the response, plus the new prompt for ChatGPT. This means incrementally larger payloads and slower response times as you iterate. Not to mention, the history is trapped in that session or state. But, if we use Confluent Cloud and ksqlDB, for example, we can preserve both state and history.
In our case, ksqlDB is used to load the user’s chat history and the ongoing conversation flow within each chat. This also allows for opening previous chats and continuing the conversation in real time. You could add a tumbling window so that the stream presented by ksqlDB only has the most recent chats or for a specific retention window. You could also use ksqlDB to present a single materialized view of the user’s prompt template. For example, a control operator would ask different questions than a diagnostician or machine programmer. In any of these cases these same streams are persisted in Kafka for learning and training the back end models, the results of which which can then be merged back in to these contexts, and get better over time.
Waterstream Cloud enables the direct saving of new Chats into a “Chats” topic for each user and user prompts as MQTT messages into a Confluent Cloud Kafka topic named “chatMessages”, eliminating the need for intermediaries. From there, the platform leverages ksqlDB stream processing to orchestrate the flow of events.
ksqlDB and MQTT topics are a match made in heaven. MQTT topic structure is ideal for routing flows of events from devices, UI, and the AI assistant into various ksqlDB stream pathways depending on the topic and/or value of the message. For example, the ChatGPT app supports Azure OpenAI GPT-3.5-turbo or OpenAI GPT-4. A single prompt gets routed from the “chatMessages/{chatId}” MQTT topic to either Azure OpenAI or OpenAI Azure Function Sink connectors. “ChatMessages_stream_aoai” and “ChatMessages_stream_oai” are the input streams that trigger the api calls.
Similarly, to make the prompt aware of the enterprise context, chatMessages can be joined with data obtained from private data streams using ksqlDB. The MQTT topic effectively steers the prompt to be appended with the correct private data source(s). These private streams can be generated using connectors available in Confluent Cloud. The augmented prompt topic is then forwarded to ChatGPT for processing.
ChatGPT generates responses in the form of chunks, which are groups of one or more words. These chunks are stored in a stream called “chatgptResponseStream” as they are generated.
At this stage, Waterstream Cloud plays a role once again by transmitting the answer chunks from the Kafka topic as messages to a specific MQTT topic. The frontend client subscribes to the MQTT topic “chatgptResponseStream.” As ChatGPT generates responses, the UI dynamically updates to display the words in real time. This allows users to observe the gradual construction of the response, with new words appearing on the screen, similar to the familiar OpenAI application. It is worth mentioning that other topics are used in this PoC to save chat history and user information, but aren’t discussed in detail here.
One of the advantages offered by Waterstream is the ability to write directly from the UI into Kafka and vice versa. ChatGPT chunks and other events can be easily communicated to the UI as notifications. Utilizing an MQTT broker instead of plain WebSockets enables the organization of data flow into multiple streams, simplifying the creation of a reactive UI. Moreover, Waterstream Cloud provides authentication and authorization capabilities, which are crucial for enterprise applications. User information can also be utilized to organize MQTT topics, leading to a well-structured data system.
For further details about the Collaborative Intelligence Platform (CIP), please refer to the “Collaborative Intelligence: An AI-Powered System That Puts Humans in Control” white paper available on the Centric Consulting website. Another interesting reading on these themes is a blog post titled “GPT-4 + Streaming Data = Real-Time Generative AI” by Michael Drogalis, Principal Technologist at Confluent.
Conclusion
In conclusion, in this blog post we discussed the journey from Waterstream to Waterstream Cloud, the integration with Confluent Cloud, and the idea and implementation of a Collaborative Intelligence Platform (CIP). Together, these technologies form a unified system that combines user prompts, actions, and private enterprise data to form Collaborative Intelligence. Although CIP is still in its early stages, it holds great potential by utilizing a streaming data approach to merge real-world events, user knowledge, control, and AI assistance. This integrated approach shows promise for enhancing collaboration and decision-making processes with the human still in the center.